幸运24-雪花主键-mq-合并topic
This commit is contained in:
@@ -1338,8 +1338,6 @@ public enum RedisKey {
|
||||
lucky_24_user_lock,
|
||||
lucky_24_robot_push_msg,
|
||||
lucky_24_record_message, // 礼物消息的状态
|
||||
lucky_24_status, // 礼物消息的状态
|
||||
lock_lucky_24_message, // 消费送礼物消息锁
|
||||
lucky_24_user_10w_stat, // 消费送礼物消息锁
|
||||
|
||||
lucky_24_extra_stock,
|
||||
|
@@ -32,7 +32,7 @@ import java.util.Optional;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
public class Lucky24MessageService extends BaseService implements InitializingBean {
|
||||
public class Lucky24MessageService implements InitializingBean {
|
||||
|
||||
@Autowired
|
||||
private RedissonClient redissonClient;
|
||||
@@ -66,37 +66,6 @@ public class Lucky24MessageService extends BaseService implements InitializingBe
|
||||
Gift gift = giftService.getGiftById(giftMessage.getGiftId());
|
||||
Date createTime = new Date(giftMessage.getCreateTime());
|
||||
|
||||
Lucky24Record record = insertRecord(giftMessage);
|
||||
|
||||
log.info("【处理lucky24 mq】 record 插入成功 messId:{} recordId:{} record:{}",
|
||||
giftMessage.getMessId(), record.getId(), JSON.toJSONString(record));
|
||||
|
||||
// 收礼者收益
|
||||
Lucky24GiftConfig config = sendService.getConfig();
|
||||
Lucky24GiftConfig partitionConfig = config.getRatioByPartitionId(giftMessage.getPartitionId());
|
||||
SuperLuckyGiftIncomeAllot receiverIncomeAllot = incomeAllotService.calculate(partitionConfig, gift, giftMessage.getGiftNum(), Collections.singletonList(record.getReceiverUid()));
|
||||
superLuckyGiftSendService.syncSettlement(giftMessage.getUid(), gift, giftMessage.getGiftNum(), giftMessage.getGiftNum(), room, receiverIncomeAllot, createTime);
|
||||
|
||||
logger.info("【处理lucky24 mq】 收礼收益已发放 messId: {} incomeAllot: {}", giftMessage.getMessId(), JSON.toJSONString(receiverIncomeAllot));
|
||||
|
||||
// 异步,报错不会触发mq重试
|
||||
if (!CollectionUtils.isEmpty(config.getFollowUidList()) && config.getFollowUidList().contains(record.getUid())){
|
||||
robotMsgService.pushFollowUser(record.getUid(), record.getReceiverUid(), record.getRoomUid());
|
||||
}
|
||||
|
||||
// 异步,报错不会触发mq重试
|
||||
lucky24SendWeekRankService.updateRank(record);
|
||||
|
||||
// 删除该标识,表示消息已经消费过
|
||||
jedisService.hdel(RedisKey.lucky_24_status.getKey(), giftMessage.getMessId());
|
||||
}
|
||||
|
||||
public void handleMessageV2(Lucky24Message giftMessage) {
|
||||
|
||||
Room room = null != giftMessage.getRoomUid()? roomService.getRoomByUid(giftMessage.getRoomUid()): null;
|
||||
Gift gift = giftService.getGiftById(giftMessage.getGiftId());
|
||||
Date createTime = new Date(giftMessage.getCreateTime());
|
||||
|
||||
Optional<Lucky24Record> recordOptional = insertRecordIgnore(giftMessage);
|
||||
if (recordOptional.isEmpty()){
|
||||
return;
|
||||
@@ -113,7 +82,7 @@ public class Lucky24MessageService extends BaseService implements InitializingBe
|
||||
SuperLuckyGiftIncomeAllot receiverIncomeAllot = incomeAllotService.calculate(partitionConfig, gift, giftMessage.getGiftNum(), Collections.singletonList(record.getReceiverUid()));
|
||||
superLuckyGiftSendService.syncSettlement(giftMessage.getUid(), gift, giftMessage.getGiftNum(), giftMessage.getGiftNum(), room, receiverIncomeAllot, createTime);
|
||||
|
||||
logger.info("【处理lucky24 mq】 收礼收益已发放 messId: {} incomeAllot: {}", giftMessage.getMessId(), JSON.toJSONString(receiverIncomeAllot));
|
||||
log.info("【处理lucky24 mq】 收礼收益已发放 messId: {} incomeAllot: {}", giftMessage.getMessId(), JSON.toJSONString(receiverIncomeAllot));
|
||||
|
||||
// 异步,报错不会触发mq重试
|
||||
if (!CollectionUtils.isEmpty(config.getFollowUidList()) && config.getFollowUidList().contains(record.getUid())){
|
||||
@@ -127,44 +96,6 @@ public class Lucky24MessageService extends BaseService implements InitializingBe
|
||||
recordMessMap.fastRemove(giftMessage.getMessId());
|
||||
}
|
||||
|
||||
private Lucky24Record insertRecord(Lucky24Message giftMessage) {
|
||||
Lucky24Record record = new Lucky24Record();
|
||||
|
||||
if (null == giftMessage.getId()){
|
||||
giftMessage.setId(identifierGenerator.nextId( null).longValue());
|
||||
}
|
||||
|
||||
record.setId(giftMessage.getId());
|
||||
record.setMessId(giftMessage.getMessId());
|
||||
record.setPartitionId(giftMessage.getPartitionId());
|
||||
record.setUid(giftMessage.getUid());
|
||||
record.setReceiverUid(giftMessage.getReceiverUid());
|
||||
record.setRoomUid(giftMessage.getRoomUid());
|
||||
record.setGiftId(giftMessage.getGiftId());
|
||||
record.setGiftGoldPrice(giftMessage.getGiftGoldPrice());
|
||||
record.setGiftNum(giftMessage.getGiftNum());
|
||||
record.setPoolType(giftMessage.getPoolType());
|
||||
record.setPoolId(giftMessage.getPoolId());
|
||||
|
||||
if (null == giftMessage.getPoolType() && null != giftMessage.getPoolId()){
|
||||
Lucky24Pool pool = poolMapper.selectById(giftMessage.getPoolId());
|
||||
if (null != pool){
|
||||
record.setPoolType(pool.getType());
|
||||
}
|
||||
}
|
||||
|
||||
record.setIsSupplement(giftMessage.getIsSupplement());
|
||||
record.setDrawMultiple(giftMessage.getDrawMultiple());
|
||||
record.setAfterMultiple(giftMessage.getAfterMultiple());
|
||||
record.setWinGoldNum(giftMessage.getWinGoldNum());
|
||||
record.setCreateTime(new Date(giftMessage.getCreateTime()));
|
||||
record.setStockResult(giftMessage.getStockResult());
|
||||
|
||||
recordService.insertRecord(record);
|
||||
|
||||
return record;
|
||||
}
|
||||
|
||||
private Optional<Lucky24Record> insertRecordIgnore(Lucky24Message giftMessage) {
|
||||
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
@@ -57,7 +57,7 @@ public class RocketMQService {
|
||||
List<Message<String>> messageList = lucky24Messages.stream()
|
||||
.map(giftMessage -> MessageBuilder.withPayload(JSON.toJSONString(giftMessage)).build())
|
||||
.collect(Collectors.toList());
|
||||
mqMessageProducer.send(MqConstant.LUCKY_24_V2_TOPIC, messageList,
|
||||
mqMessageProducer.send(MqConstant.LUCKY_24_TOPIC, messageList,
|
||||
sendResult -> log.info("sendLucky24Message success result: {} message: {}", JSON.toJSONString(sendResult), messageList),
|
||||
throwable -> log.error("sendLucky24Message fail message: {}", messageList, throwable));
|
||||
}
|
||||
|
@@ -32,9 +32,6 @@ public interface MqConstant {
|
||||
String LUCKY_24_TOPIC = "lucky_24_topic";
|
||||
String LUCKY_24_CONSUME_GROUP = "lucky_24_consume_group";
|
||||
|
||||
String LUCKY_24_V2_TOPIC = "lucky_24_v2_topic";
|
||||
String LUCKY_24_V2_CONSUME_GROUP = "lucky_24_v2_consume_group";
|
||||
|
||||
String BILL_RECORD_TOPIC = "bill_record_topic";
|
||||
String BILL_RECORD_CONSUME_GROUP = "bill_record_consume_group";
|
||||
|
||||
|
@@ -1,28 +0,0 @@
|
||||
package com.accompany.mq.consumer;
|
||||
|
||||
import com.accompany.business.message.Lucky24Message;
|
||||
import com.accompany.business.service.gift.Lucky24MessageService;
|
||||
import com.accompany.mq.constant.MqConstant;
|
||||
import com.accompany.mq.listener.AbstractMessageListener;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@ConditionalOnProperty(name = "spring.application.name", havingValue = "web")
|
||||
@RocketMQMessageListener(topic = MqConstant.LUCKY_24_V2_TOPIC, consumerGroup = MqConstant.LUCKY_24_V2_CONSUME_GROUP)
|
||||
public class Lucky24MessageV2Consumer extends AbstractMessageListener<Lucky24Message> {
|
||||
|
||||
@Autowired
|
||||
private Lucky24MessageService lucky24MessageService;
|
||||
|
||||
@Override
|
||||
public void onMessage(Lucky24Message giftMessage) {
|
||||
log.info("onMessage lucky24MessageV2: {}", giftMessage.toString());
|
||||
lucky24MessageService.handleMessageV2(giftMessage);
|
||||
}
|
||||
|
||||
}
|
@@ -3,26 +3,21 @@ package com.accompany.scheduler.task.luckyBag;
|
||||
import com.accompany.business.message.Lucky24Message;
|
||||
import com.accompany.business.service.gift.Lucky24MessageService;
|
||||
import com.accompany.business.service.lucky.Lucky24RecordService;
|
||||
import com.accompany.common.redis.RedisKey;
|
||||
import com.accompany.common.utils.DateTimeUtil;
|
||||
import com.accompany.core.model.PartitionInfo;
|
||||
import com.accompany.core.service.common.JedisService;
|
||||
import com.accompany.core.service.partition.PartitionInfoService;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.time.Duration;
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
@@ -32,40 +27,9 @@ public class Lucky24Task {
|
||||
private PartitionInfoService partitionInfoService;
|
||||
@Autowired
|
||||
private Lucky24RecordService service;
|
||||
|
||||
@Autowired
|
||||
private JedisService jedisService;
|
||||
@Autowired
|
||||
private Lucky24MessageService lucky24MessageService;
|
||||
|
||||
/**
|
||||
* 重新消费队列的消息
|
||||
*/
|
||||
@Scheduled(cron = "0 */5 * * * ?")
|
||||
public void retryLucky24Queue() {
|
||||
log.info("retryLucky24Queue start ...");
|
||||
Map<String, String> map = jedisService.hgetAll(RedisKey.lucky_24_status.getKey());
|
||||
if (map == null || map.size() == 0) {
|
||||
return;
|
||||
}
|
||||
long curTime = System.currentTimeMillis();
|
||||
long gapTime = 1000 * 60 * 10; // 十分钟内没被消费
|
||||
|
||||
map.entrySet().parallelStream().forEach(entry -> {
|
||||
try {
|
||||
String messId = entry.getKey();
|
||||
String val = entry.getValue();
|
||||
Lucky24Message giftMessage = JSON.parseObject(val, Lucky24Message.class);
|
||||
if (curTime - giftMessage.getCreateTime() > gapTime) {
|
||||
lucky24MessageService.handleMessage(giftMessage);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("retryLucky24Queue error", e);
|
||||
}
|
||||
});
|
||||
log.info("retryLucky24Queue end ...");
|
||||
}
|
||||
|
||||
/**
|
||||
* 重新消费队列的消息
|
||||
*/
|
||||
@@ -84,7 +48,7 @@ public class Lucky24Task {
|
||||
String messId = entry.getKey();
|
||||
Lucky24Message giftMessage = entry.getValue();
|
||||
if (curTime - giftMessage.getCreateTime() > gapTime) {
|
||||
lucky24MessageService.handleMessageV2(giftMessage);
|
||||
lucky24MessageService.handleMessage(giftMessage);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("retryLucky24Queue error", e);
|
||||
|
Reference in New Issue
Block a user