幸运礼物-mq异步结算,Lucky24Record加上roomUid和messId

This commit is contained in:
khalil
2025-01-10 17:57:49 +08:00
parent a318876075
commit a4b9d4a7bd
13 changed files with 364 additions and 22 deletions

View File

@@ -1355,6 +1355,8 @@ public enum RedisKey {
lucky_24_user_history,
lucky_24_user_lock,
lucky_24_robot_push_msg,
lucky_24_status, // 礼物消息的状态
lock_lucky_24_message, // 消费送礼物消息锁
family_diamond_settlement,

View File

@@ -16,6 +16,7 @@ public class Lucky24Record {
private Integer partitionId;
private Long uid;
private Long receiverUid;
private Long roomUid;
private Integer giftId;
private Long giftGoldPrice;
private Integer giftNum;
@@ -25,4 +26,5 @@ public class Lucky24Record {
private Integer afterMultiple;
private Long winGoldNum;
private Date createTime;
private String messId;
}

View File

@@ -0,0 +1,27 @@
package com.accompany.business.message;
import com.accompany.mq.model.BaseMqMessage;
import lombok.Data;
import java.util.Date;
/**
* 礼物消息
*/
@Data
public class Lucky24Message extends BaseMqMessage {
private Integer partitionId;
private Long uid;
private Long receiverUid;
private Long roomUid;
private Integer giftId;
private Long giftGoldPrice;
private Integer giftNum;
private Integer poolId;
private Boolean isSupplement;
private Integer drawMultiple;
private Integer afterMultiple;
private Long winGoldNum;
private Long createTime;
private String messId;
}

View File

@@ -3,28 +3,35 @@ package com.accompany.business.service.gift;
import com.accompany.business.dto.lucky.Lucky24GiftConfig;
import com.accompany.business.dto.lucky.Lucky24Result;
import com.accompany.business.dto.lucky.SuperLuckyGiftIncomeAllot;
import com.accompany.business.message.GiftMessage;
import com.accompany.business.message.Lucky24Message;
import com.accompany.business.model.Gift;
import com.accompany.business.model.UserPurse;
import com.accompany.business.model.lucky.Lucky24Pool;
import com.accompany.business.service.lucky.*;
import com.accompany.business.service.mq.RocketMQService;
import com.accompany.common.constant.Constant;
import com.accompany.common.redis.RedisKey;
import com.accompany.common.status.BusiStatus;
import com.accompany.core.exception.ServiceException;
import com.accompany.core.model.Room;
import com.accompany.core.model.Users;
import com.accompany.core.service.SysConfService;
import com.accompany.core.service.common.JedisService;
import com.accompany.core.util.DoubleUtil;
import com.accompany.sharding.model.Lucky24Record;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.incrementer.DefaultIdentifierGenerator;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RDeque;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.math.BigDecimal;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.OptionalDouble;
import java.util.*;
import java.util.stream.Collectors;
@Slf4j
@@ -47,11 +54,14 @@ public class Lucky24GiftSendService {
private Lucky24SettlementService settlementService;
@Autowired
private Lucky24RobotMsgService robotMsgService;
@Autowired
private RocketMQService rocketMQService;
@Autowired
private JedisService jedisService;
public void draw(long senderUid, Users sender, Integer partitionId, Room room, List<Long> receiveList,
Gift gift, int everyGiftNum, Date sendGiftTime) {
int totalGiftNum = everyGiftNum * receiveList.size();
long everyoneGoldNum = everyGiftNum * gift.getGoldPrice();
Lucky24GiftConfig config = getConfig();
@@ -63,14 +73,82 @@ public class Lucky24GiftSendService {
log.info("[lucky24] uid {}, partitionId {}, addStockGoldNum {}, afterStock {}",
senderUid, partitionId, incomeAllot.getRemainValue(), afterStock);
Map<Long, Long> winGoldNumMap = receiveList.parallelStream()
Map<Long, Lucky24Record> recordMap = receiveList.parallelStream()
.collect(Collectors.toMap(receiverUid-> receiverUid,
receiverUid-> drawMultiple2(config, partitionConfig, senderUid, sender, partitionId, gift, everyGiftNum, receiverUid, everyoneGoldNum, room, sendGiftTime)));
log.info("[lucky24] uid {}, totalWinGoldNum {}", senderUid, JSON.toJSONString(recordMap));
sendMq(recordMap);
/*Map<Long, Long> winGoldNumMap = receiveList.parallelStream()
.collect(Collectors.toMap(receiverUid-> receiverUid,
receiverUid-> drawMultiple(config, partitionConfig, senderUid, sender, partitionId, gift, everyGiftNum, receiverUid, everyoneGoldNum, room, sendGiftTime)));
log.info("[lucky24] uid {}, totalWinGoldNUm {}", senderUid, JSON.toJSONString(winGoldNumMap));
settlementService.settlement(config, senderUid, room, gift, everyGiftNum, totalGiftNum, winGoldNumMap, incomeAllot, sendGiftTime);
settlementService.settlement(config, senderUid, room, gift, everyGiftNum, totalGiftNum, winGoldNumMap, incomeAllot, sendGiftTime);*/
}
private void sendMq(Map<Long, Lucky24Record> recordMap) {
Map<String, String> caches = new HashMap<>(recordMap.size());
List<Lucky24Message> messageList = new ArrayList<>();
DefaultIdentifierGenerator idGenerator = DefaultIdentifierGenerator.getInstance();
for (Lucky24Record record: recordMap.values()){
String id = idGenerator.nextUUID(null);
Lucky24Message message = new Lucky24Message();
message.setMessId(id);
message.setPartitionId(record.getPartitionId());
message.setUid(record.getUid());
message.setReceiverUid(record.getReceiverUid());
message.setRoomUid(record.getRoomUid());
message.setGiftId(record.getGiftId());
message.setGiftGoldPrice(record.getGiftGoldPrice());
message.setGiftNum(record.getGiftNum());
message.setPoolId(record.getPoolId());
message.setIsSupplement(record.getIsSupplement());
message.setDrawMultiple(record.getDrawMultiple());
message.setAfterMultiple(record.getAfterMultiple());
message.setWinGoldNum(record.getWinGoldNum());
message.setCreateTime(record.getCreateTime().getTime());
messageList.add(message);
caches.put(id, JSON.toJSONString(message));
}
jedisService.hwrite(RedisKey.lucky_24_status.getKey(), caches);
rocketMQService.asyncSendBatchLucky24Message(messageList);
}
public Lucky24Record drawMultiple2(Lucky24GiftConfig config, Lucky24GiftConfig partitionConfig, long senderUid, Users sender, int partitionId,
Gift gift, int giftNum, long receiverUid, long everyoneGoldNum, Room room, Date sendGiftTime) {
Lucky24Result drawResult = poolService.drawMultipleFromPool(config, senderUid, partitionId);
long curTimes = userMetaService.updateUserTimes(senderUid);
Long supplementMultiple = judgeSupplement(config, partitionConfig.getSupplement(), senderUid, receiverUid, curTimes, drawResult);
long drawMultiple = null != supplementMultiple? supplementMultiple: drawResult.getOutput();
long afterMultiple = drawMultiple;
if (drawMultiple > 0){
BigDecimal winGoldNum = BigDecimal.valueOf(everyoneGoldNum * drawMultiple);
BigDecimal afterStock = stockService.subStock(partitionId, winGoldNum);
if (afterStock.compareTo(BigDecimal.ZERO) < 0){
log.info("[lucky24] drawMultiple sender {} receiver {} 产出大于库存 winGoldNum {} afterStock {}",
senderUid, receiverUid, winGoldNum, afterStock);
afterStock = stockService.addStock(partitionId, winGoldNum);
robotMsgService.pushStockNotEnough(partitionId, afterStock);
afterMultiple = 0;
}
}
long winGoldNum = afterMultiple * everyoneGoldNum;
userMetaService.updateUserMeta(config, senderUid, partitionId, curTimes, everyoneGoldNum, winGoldNum);
return recordService.buildRecord(senderUid, sender.getPartitionId(), gift, giftNum, null != room? room.getUid(): null,
receiverUid, drawResult.getPoolId(), null != supplementMultiple || Boolean.TRUE.equals(drawResult.getIsSupplement()),
drawMultiple, afterMultiple, sendGiftTime);
}
@Deprecated
private long drawMultiple(Lucky24GiftConfig config, Lucky24GiftConfig partitionConfig, long senderUid, Users sender, int partitionId,
Gift gift, int giftNum, long receiverUid, long everyoneGoldNum, Room room, Date sendGiftTime) {
Lucky24Result drawResult = poolService.drawMultipleFromPool(config, senderUid, partitionId);

View File

@@ -0,0 +1,124 @@
package com.accompany.business.service.gift;
import com.accompany.business.dto.lucky.Lucky24GiftConfig;
import com.accompany.business.dto.lucky.SuperLuckyGiftIncomeAllot;
import com.accompany.business.message.Lucky24Message;
import com.accompany.business.model.Gift;
import com.accompany.business.service.lucky.Lucky24IncomeAllotService;
import com.accompany.business.service.lucky.Lucky24RecordService;
import com.accompany.business.service.lucky.Lucky24RobotMsgService;
import com.accompany.business.service.lucky.Lucky24SettlementService;
import com.accompany.business.service.room.RoomService;
import com.accompany.business.service.user.UsersService;
import com.accompany.common.redis.RedisKey;
import com.accompany.core.model.Room;
import com.accompany.core.model.Users;
import com.accompany.core.service.base.BaseService;
import com.accompany.sharding.model.Lucky24Record;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Collections;
import java.util.Date;
@Slf4j
@Service
public class Lucky24MessageService extends BaseService {
@Autowired
private Lucky24RecordService recordService;
@Autowired
private Lucky24IncomeAllotService incomeAllotService;
@Autowired
private SuperLuckyGiftSendService superLuckyGiftSendService;
@Autowired
private Lucky24SettlementService settlementService;
@Autowired
private Lucky24RobotMsgService robotMsgService;
@Autowired
private Lucky24GiftSendService sendService;
@Autowired
private RoomService roomService;
@Autowired
private GiftService giftService;
@Autowired
private UsersService usersService;
public void handleGiftMessage(Lucky24Message giftMessage) {
// 防止消息被重复消费
if (!jedisLockService.isExist(RedisKey.lock_lucky_24_message.getKey(giftMessage.getMessId()), 30)) {
logger.warn("handleLucky24Message giftMessage had handle, mess: " + giftMessage);
return;
}
if (!jedisService.hexists(RedisKey.lucky_24_status.getKey(), giftMessage.getMessId())){
logger.warn("handleLucky24Message giftMessage had handle, mess: " + giftMessage);
return;
}
logger.info("【处理lucky24 mq】 开始处理 giftMessage: {}", JSON.toJSONString(giftMessage));
Room room = null != giftMessage.getRoomUid()? roomService.getRoomByUid(giftMessage.getRoomUid()): null;
Gift gift = giftService.getGiftById(giftMessage.getGiftId());
Date createTime = new Date(giftMessage.getCreateTime());
Lucky24Record record = insertRecord(giftMessage);
log.info("【处理lucky24 mq】 record 插入成功 messId:{} recordId:{} record:{}",
giftMessage.getMessId(), record.getId(), JSON.toJSONString(record));
// 收礼者收益
Lucky24GiftConfig config = sendService.getConfig();
Lucky24GiftConfig partitionConfig = config.getRatioByPartitionId(giftMessage.getPartitionId());
SuperLuckyGiftIncomeAllot receiverIncomeAllot = incomeAllotService.calculate(partitionConfig, gift, giftMessage.getGiftNum(), Collections.singletonList(record.getReceiverUid()));
superLuckyGiftSendService.syncSettlement(giftMessage.getUid(), gift, giftMessage.getGiftNum(), giftMessage.getGiftNum(), room, receiverIncomeAllot, createTime);
logger.info("【处理lucky24 mq】 收礼收益已发放 messId: {} incomeAllot: {}", giftMessage.getMessId(), JSON.toJSONString(receiverIncomeAllot));
if (record.getWinGoldNum() > 0L){
settlementService.syncSendReward(partitionConfig, record.getUid(), room, gift, record.getWinGoldNum(), record.getAfterMultiple());
logger.info("【处理lucky24 mq】 送礼收益已发放 messId: {} senderUid: {} winGoldNum: {} afterMultiple: {}",
giftMessage.getMessId(), record.getUid(), record.getWinGoldNum(), record.getAfterMultiple());
}
// 后面都是异步发消息
if (record.getAfterMultiple() >= config.getWarnMulti()){
long totalGoldNum = giftMessage.getGiftNum() * giftMessage.getGiftGoldPrice();
robotMsgService.pushSuperMulti(record.getUid(), record.getReceiverUid(), record.getAfterMultiple(), totalGoldNum, record.getWinGoldNum(),
record.getRoomUid());
}
Users u = usersService.getNotNullUsersByUid(record.getUid());
if (null != u && config.getFollowErbanNoList().contains(u.getErbanNo())){
robotMsgService.pushFollowUser(record.getUid(), record.getReceiverUid(), record.getRoomUid());
}
// 删除该标识,表示消息已经消费过
jedisService.hdel(RedisKey.lucky_24_status.getKey(), giftMessage.getMessId());
}
private Lucky24Record insertRecord(Lucky24Message giftMessage) {
Lucky24Record record = new Lucky24Record();
record.setMessId(giftMessage.getMessId());
record.setPartitionId(giftMessage.getPartitionId());
record.setUid(giftMessage.getUid());
record.setReceiverUid(giftMessage.getReceiverUid());
record.setRoomUid(giftMessage.getRoomUid());
record.setGiftId(giftMessage.getGiftId());
record.setGiftGoldPrice(giftMessage.getGiftGoldPrice());
record.setGiftNum(giftMessage.getGiftNum());
record.setPoolId(giftMessage.getPoolId());
record.setIsSupplement(giftMessage.getIsSupplement());
record.setDrawMultiple(giftMessage.getDrawMultiple());
record.setAfterMultiple(giftMessage.getAfterMultiple());
record.setWinGoldNum(giftMessage.getWinGoldNum());
record.setCreateTime(new Date(giftMessage.getCreateTime()));
recordService.insertRecord(record);
return record;
}
}

View File

@@ -177,6 +177,12 @@ public class SuperLuckyGiftSendService {
room, incomeAllot, sendGiftTime);
}
public void syncSettlement(Long senderUid, Gift gift, int everyGiftNum, int totalGiftNum,
Room room, SuperLuckyGiftIncomeAllot incomeAllot, Date sendGiftTime){
settlement(senderUid, gift, everyGiftNum, totalGiftNum, room, incomeAllot, sendGiftTime);
}
@Async
//@Transactional(rollbackFor = Exception.class, transactionManager = "mybatisplusTransactionManager")
public void settlement(Long senderUid, Gift gift, int everyGiftNum, int totalGiftNum,

View File

@@ -33,6 +33,25 @@ public class Lucky24RecordService extends ServiceImpl<Lucky24RecordMapper, Lucky
@Resource(name = "bizExecutor")
private ThreadPoolExecutor bizExecutor;
public Lucky24Record buildRecord(long senderUid, int partitionId, Gift gift, int giftNum, Long roomUid, long receiverUid, int poolId,
boolean isSupplement, long drawMultiple, long afterMultiple, Date sendGiftTime) {
Lucky24Record record = new Lucky24Record();
record.setUid(senderUid);
record.setPartitionId(partitionId);
record.setReceiverUid(receiverUid);
record.setRoomUid(roomUid);
record.setGiftId(gift.getGiftId());
record.setGiftNum(giftNum);
record.setGiftGoldPrice(gift.getGoldPrice());
record.setPoolId(poolId);
record.setIsSupplement(isSupplement);
record.setDrawMultiple((int) drawMultiple);
record.setAfterMultiple((int) afterMultiple);
record.setWinGoldNum(afterMultiple * gift.getGoldPrice() * giftNum);
record.setCreateTime(sendGiftTime);
return record;
}
@Async
public void saveRecord(long senderUid, int partitionId, Gift gift, int giftNum, long receiverUid, int poolId,
boolean isSupplement, long drawMultiple, long afterMultiple, Date sendGiftTime) {
@@ -50,6 +69,10 @@ public class Lucky24RecordService extends ServiceImpl<Lucky24RecordMapper, Lucky
record.setWinGoldNum(afterMultiple * gift.getGoldPrice() * giftNum);
record.setCreateTime(sendGiftTime);
insertRecord(record);
}
public void insertRecord(Lucky24Record record) {
for (int i = 0; i < 3; i++) {
try {
record.setId(null);

View File

@@ -25,6 +25,10 @@ public class Lucky24SettlementService {
@Autowired
private SuperLuckyGiftSendService superLuckyGiftSendService;
public void syncSendReward(Lucky24GiftConfig config, long senderUid, Room room, Gift gift, long winGoldNum, long afterMultiple){
sendReward(config, senderUid, room, gift, winGoldNum, afterMultiple);
}
@Async
public void sendReward(Lucky24GiftConfig config, long senderUid, Room room, Gift gift, long winGoldNum, long afterMultiple){
// 道具奖励
@@ -33,7 +37,8 @@ public class Lucky24SettlementService {
//飘屏
if (null != room){
superLuckyGiftSendService.sendTip(senderUid, room, BigDecimal.valueOf(winGoldNum), BigDecimal.valueOf(afterMultiple), afterMultiple >= config.getSpecialTipMulti() ? 2 : 1);
superLuckyGiftSendService.sendTip(senderUid, room, BigDecimal.valueOf(winGoldNum), BigDecimal.valueOf(afterMultiple),
afterMultiple >= config.getSpecialTipMulti() ? 2 : 1);
if (winGoldNum >= config.getAllRoomChatToastValue()){
superLuckyGiftSendService.sendAllRoomScreen(senderUid, room, gift, BigDecimal.valueOf(winGoldNum));

View File

@@ -30,18 +30,6 @@ public class RocketMQService {
@Autowired
private MQMessageProducer mqMessageProducer;
/**
* 送礼物消息发送到MQ
*
* @param giftMessage
*/
public void sendGiftMessage(GiftMessage giftMessage) {
mqMessageProducer.send(MqConstant.GIFT_TOPIC, giftMessage, sendResult -> log.info("sendGiftMessage success message: {}", JSON.toJSONString(giftMessage)), throwable -> {
log.error("sendGiftMessage fail message: {}", JSON.toJSONString(giftMessage), throwable);
//SpringContextHolder.getBean(GiftMessageMQListener.class).onMessage(giftMessage);
});
}
/**
* 送礼物消息发送到MQ
*
@@ -59,6 +47,21 @@ public class RocketMQService {
}
}
/**
* 送礼物消息发送到MQ
*
* @param lucky24Messages
*/
public void asyncSendBatchLucky24Message(Collection<Lucky24Message> lucky24Messages) {
for (Lucky24Message lucky24Message : lucky24Messages){
mqMessageProducer.sendOrderly(MqConstant.LUCKY_24_TOPIC, lucky24Message, lucky24Message.getReceiverUid().toString(),
sendResult -> log.info("sendLucky24Message success message: {} queue {}", JSON.toJSONString(lucky24Message),
sendResult.getMessageQueue().getQueueId()), throwable -> {
log.error("sendLucky24Message fail message: {}", JSON.toJSONString(lucky24Message), throwable);
});
}
}
/**
* 发送开箱子中奖消息
*/

View File

@@ -32,4 +32,7 @@ public interface MqConstant {
String ACT_TASK_REWARD_CONSUME_GROUP = "act_task_reward_consume_group";
String LUCKY_24_TOPIC = "lucky_24_topic";
String LUCKY_24_CONSUME_GROUP = "lucky_24_group";
}

View File

@@ -32,8 +32,8 @@ public abstract class AbstractMessageListener<T extends BaseMqMessage> implement
@Override
public void onMessage(String message) {
try {
log.info("====mq message start====");
log.info("text message : {}", message);
//log.info("====mq message start====");
//log.info("text message : {}", message);
if (!message.startsWith(StrUtil.DELIM_START) || !message.endsWith(StrUtil.DELIM_END)) {
return;
}

View File

@@ -0,0 +1,29 @@
package com.accompany.mq.consumer;
import com.accompany.business.message.Lucky24Message;
import com.accompany.business.service.gift.Lucky24MessageService;
import com.accompany.mq.constant.MqConstant;
import com.accompany.mq.listener.AbstractMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@ConditionalOnProperty(name = "spring.application.name", havingValue = "web")
@RocketMQMessageListener(topic = MqConstant.LUCKY_24_TOPIC, consumerGroup = MqConstant.LUCKY_24_CONSUME_GROUP, consumeMode = ConsumeMode.ORDERLY)
public class Lucky24MessageConsumer extends AbstractMessageListener<Lucky24Message> {
@Autowired
private Lucky24MessageService lucky24MessageService;
@Override
public void onMessage(Lucky24Message giftMessage) {
log.info("onMessage lucky24Message: {}", giftMessage.toString());
lucky24MessageService.handleGiftMessage(giftMessage);
}
}

View File

@@ -1,9 +1,15 @@
package com.accompany.scheduler.task.luckyBag;
import com.accompany.business.message.GiftMessage;
import com.accompany.business.message.Lucky24Message;
import com.accompany.business.service.gift.Lucky24MessageService;
import com.accompany.business.service.lucky.Lucky24RecordService;
import com.accompany.common.redis.RedisKey;
import com.accompany.common.utils.DateTimeUtil;
import com.accompany.core.model.PartitionInfo;
import com.accompany.core.service.common.JedisService;
import com.accompany.core.service.partition.PartitionInfoService;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
@@ -15,6 +21,7 @@ import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
@Component
@@ -28,6 +35,39 @@ public class Lucky24Task {
@Resource(name = "bizExecutor")
private ThreadPoolExecutor bizExecutor;
@Autowired
private JedisService jedisService;
@Autowired
private Lucky24MessageService lucky24MessageService;
/**
* 重新消费队列的消息
*/
@Scheduled(cron = "0 */5 * * * ?")
public void retryLucky24Queue() {
log.info("retryLucky24Queue start ...");
Map<String, String> map = jedisService.hgetAll(RedisKey.lucky_24_status.getKey());
if (map == null || map.size() == 0) {
return;
}
long curTime = System.currentTimeMillis();
long gapTime = 1000 * 60 * 10; // 十分钟内没被消费
map.entrySet().parallelStream().forEach(entry -> {
try {
String messId = entry.getKey();
String val = entry.getValue();
Lucky24Message giftMessage = JSON.parseObject(val, Lucky24Message.class);
if (curTime - giftMessage.getCreateTime() > gapTime) {
lucky24MessageService.handleGiftMessage(giftMessage);
}
} catch (Exception e) {
log.error("retryLucky24Queue error", e);
}
});
log.info("retryLucky24Queue end ...");
}
@Scheduled(cron = "0 2 * * * ? ")
public void lucky24RecordStat() {
Date now = new Date();