幸运24-雪花主键-mq-无锁化唯一主键插入
This commit is contained in:
@@ -1337,6 +1337,7 @@ public enum RedisKey {
|
||||
lucky_24_user_history,
|
||||
lucky_24_user_lock,
|
||||
lucky_24_robot_push_msg,
|
||||
lucky_24_record_message, // 礼物消息的状态
|
||||
lucky_24_status, // 礼物消息的状态
|
||||
lock_lucky_24_message, // 消费送礼物消息锁
|
||||
lucky_24_user_10w_stat, // 消费送礼物消息锁
|
||||
|
@@ -11,7 +11,7 @@ import java.util.Date;
|
||||
@Data
|
||||
public class Lucky24Record {
|
||||
|
||||
@TableId(type = IdType.AUTO)
|
||||
@TableId(type = IdType.INPUT)
|
||||
private Long id;
|
||||
private Integer partitionId;
|
||||
private Long uid;
|
||||
|
@@ -11,6 +11,11 @@ import java.util.List;
|
||||
|
||||
public interface Lucky24RecordMapper extends BaseMapper<Lucky24Record> {
|
||||
|
||||
/**
|
||||
* 批量插入记录,使用 INSERT IGNORE 忽略重复记录
|
||||
*/
|
||||
int insertIgnore(@Param("record") Lucky24Record record);
|
||||
|
||||
List<Lucky24PlatformStat> listPlatform(@Param("zoneDate")String zoneDate, @Param("partitionId") Integer partitionId,
|
||||
@Param("poolTypeList")List<Integer> poolTypeList,
|
||||
@Param("startTime") Date startTime, @Param("endTime") Date endTime);
|
||||
|
@@ -2,6 +2,47 @@
|
||||
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
|
||||
<mapper namespace="com.accompany.sharding.mapper.Lucky24RecordMapper">
|
||||
|
||||
<insert id="insertIgnore">
|
||||
INSERT IGNORE INTO lucky_24_record (
|
||||
id,
|
||||
partition_id,
|
||||
uid,
|
||||
receiver_uid,
|
||||
room_uid,
|
||||
gift_id,
|
||||
gift_gold_price,
|
||||
gift_num,
|
||||
pool_type,
|
||||
pool_id,
|
||||
is_supplement,
|
||||
draw_multiple,
|
||||
after_multiple,
|
||||
win_gold_num,
|
||||
create_time,
|
||||
stock_result,
|
||||
mess_id
|
||||
) VALUES
|
||||
(
|
||||
#{record.id},
|
||||
#{record.partitionId},
|
||||
#{record.uid},
|
||||
#{record.receiverUid},
|
||||
#{record.roomUid},
|
||||
#{record.giftId},
|
||||
#{record.giftGoldPrice},
|
||||
#{record.giftNum},
|
||||
#{record.poolType},
|
||||
#{record.poolId},
|
||||
#{record.isSupplement},
|
||||
#{record.drawMultiple},
|
||||
#{record.afterMultiple},
|
||||
#{record.winGoldNum},
|
||||
#{record.createTime},
|
||||
#{record.stockResult},
|
||||
#{record.messId}
|
||||
)
|
||||
</insert>
|
||||
|
||||
<select id="listPlatform" resultType="com.accompany.sharding.vo.Lucky24PlatformStat">
|
||||
select #{zoneDate} as `date`,
|
||||
#{partitionId} as partition_id,
|
||||
|
@@ -8,6 +8,7 @@ import lombok.Data;
|
||||
*/
|
||||
@Data
|
||||
public class Lucky24Message extends BaseMqMessage {
|
||||
private Long id;
|
||||
private Integer partitionId;
|
||||
private Long uid;
|
||||
private Long receiverUid;
|
||||
|
@@ -21,6 +21,7 @@ import com.accompany.sharding.model.Lucky24Record;
|
||||
import com.accompany.sharding.vo.Lucky24StockResultVo;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.baomidou.mybatisplus.core.incrementer.DefaultIdentifierGenerator;
|
||||
import com.baomidou.mybatisplus.core.incrementer.IdentifierGenerator;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.redisson.api.RMap;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@@ -59,11 +60,13 @@ public class Lucky24GiftSendService {
|
||||
@Autowired
|
||||
private RocketMQService rocketMQService;
|
||||
@Autowired
|
||||
private JedisService jedisService;
|
||||
@Autowired
|
||||
private Lucky24ExtraService extraService;
|
||||
@Autowired
|
||||
private UserRechargeLevelService userRechargeLevelService;
|
||||
@Autowired
|
||||
private Lucky24MessageService messageService;
|
||||
@Autowired
|
||||
private IdentifierGenerator identifierGenerator;
|
||||
|
||||
public void draw(long senderUid, Integer partitionId, Room room, List<Long> receiverList,
|
||||
Gift gift, int everyGiftNum, Date sendGiftTime) {
|
||||
@@ -203,16 +206,18 @@ public class Lucky24GiftSendService {
|
||||
}
|
||||
|
||||
private void sendMq(Map<Long, Lucky24Record> recordMap) {
|
||||
Map<String, String> caches = new HashMap<>(recordMap.size());
|
||||
Map<String, Lucky24Message> caches = new HashMap<>(recordMap.size());
|
||||
List<Lucky24Message> messageList = new ArrayList<>();
|
||||
|
||||
DefaultIdentifierGenerator idGenerator = DefaultIdentifierGenerator.getInstance();
|
||||
|
||||
for (Lucky24Record record: recordMap.values()){
|
||||
String id = idGenerator.nextUUID(null);
|
||||
long id = identifierGenerator.nextId(null).longValue();
|
||||
String messId = idGenerator.nextUUID(null);
|
||||
|
||||
Lucky24Message message = new Lucky24Message();
|
||||
message.setMessId(id);
|
||||
message.setId(id);
|
||||
message.setMessId(messId);
|
||||
message.setPartitionId(record.getPartitionId());
|
||||
message.setUid(record.getUid());
|
||||
message.setReceiverUid(record.getReceiverUid());
|
||||
@@ -231,10 +236,10 @@ public class Lucky24GiftSendService {
|
||||
|
||||
messageList.add(message);
|
||||
|
||||
caches.put(id, JSON.toJSONString(message));
|
||||
caches.put(messId, message);
|
||||
}
|
||||
|
||||
jedisService.hwrite(RedisKey.lucky_24_status.getKey(), caches);
|
||||
messageService.getRecordMessMap().putAll(caches);
|
||||
|
||||
rocketMQService.sendBatchLucky24Message(messageList);
|
||||
}
|
||||
|
@@ -16,7 +16,12 @@ import com.accompany.core.model.Room;
|
||||
import com.accompany.core.service.base.BaseService;
|
||||
import com.accompany.sharding.model.Lucky24Record;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.baomidou.mybatisplus.core.incrementer.IdentifierGenerator;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.redisson.api.RMap;
|
||||
import org.redisson.api.RedissonClient;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
@@ -26,7 +31,12 @@ import java.util.Date;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
public class Lucky24MessageService extends BaseService {
|
||||
public class Lucky24MessageService extends BaseService implements InitializingBean {
|
||||
|
||||
@Autowired
|
||||
private RedissonClient redissonClient;
|
||||
@Getter
|
||||
private RMap<String, Lucky24Message> recordMessMap;
|
||||
|
||||
@Autowired
|
||||
private Lucky24PoolMapper poolMapper;
|
||||
@@ -46,20 +56,10 @@ public class Lucky24MessageService extends BaseService {
|
||||
private GiftService giftService;
|
||||
@Autowired
|
||||
private Lucky24SendWeekRankService lucky24SendWeekRankService;
|
||||
@Autowired
|
||||
private IdentifierGenerator identifierGenerator;
|
||||
|
||||
public void handleGiftMessage(Lucky24Message giftMessage) {
|
||||
// 防止消息被重复消费
|
||||
if (!jedisLockService.isExist(RedisKey.lock_lucky_24_message.getKey(giftMessage.getMessId()), 30)) {
|
||||
logger.warn("handleLucky24Message giftMessage had handle, mess: " + giftMessage);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!jedisService.hexists(RedisKey.lucky_24_status.getKey(), giftMessage.getMessId())){
|
||||
logger.warn("handleLucky24Message giftMessage had handle, mess: " + giftMessage);
|
||||
return;
|
||||
}
|
||||
|
||||
logger.info("【处理lucky24 mq】 开始处理 giftMessage: {}", JSON.toJSONString(giftMessage));
|
||||
public void handleMessage(Lucky24Message giftMessage) {
|
||||
|
||||
Room room = null != giftMessage.getRoomUid()? roomService.getRoomByUid(giftMessage.getRoomUid()): null;
|
||||
Gift gift = giftService.getGiftById(giftMessage.getGiftId());
|
||||
@@ -78,25 +78,57 @@ public class Lucky24MessageService extends BaseService {
|
||||
|
||||
logger.info("【处理lucky24 mq】 收礼收益已发放 messId: {} incomeAllot: {}", giftMessage.getMessId(), JSON.toJSONString(receiverIncomeAllot));
|
||||
|
||||
// 后面都是异步发消息
|
||||
// if (record.getAfterMultiple() >= config.getWarnMulti()){
|
||||
// long totalGoldNum = giftMessage.getGiftNum() * giftMessage.getGiftGoldPrice();
|
||||
// robotMsgService.pushSuperMulti(record.getUid(), record.getReceiverUid(), record.getAfterMultiple(), totalGoldNum, record.getWinGoldNum(),
|
||||
// record.getRoomUid());
|
||||
// }
|
||||
|
||||
// 异步,报错不会触发mq重试
|
||||
if (!CollectionUtils.isEmpty(config.getFollowUidList()) && config.getFollowUidList().contains(record.getUid())){
|
||||
robotMsgService.pushFollowUser(record.getUid(), record.getReceiverUid(), record.getRoomUid());
|
||||
}
|
||||
|
||||
// 异步,报错不会触发mq重试
|
||||
lucky24SendWeekRankService.updateRank(record);
|
||||
|
||||
// 删除该标识,表示消息已经消费过
|
||||
jedisService.hdel(RedisKey.lucky_24_status.getKey(), giftMessage.getMessId());
|
||||
}
|
||||
|
||||
public void handleMessageV2(Lucky24Message giftMessage) {
|
||||
|
||||
Room room = null != giftMessage.getRoomUid()? roomService.getRoomByUid(giftMessage.getRoomUid()): null;
|
||||
Gift gift = giftService.getGiftById(giftMessage.getGiftId());
|
||||
Date createTime = new Date(giftMessage.getCreateTime());
|
||||
|
||||
Lucky24Record record = insertRecordV2(giftMessage);
|
||||
|
||||
log.info("【处理lucky24 mq】 record 插入成功 messId:{} recordId:{} record:{}",
|
||||
giftMessage.getMessId(), record.getId(), JSON.toJSONString(record));
|
||||
|
||||
// 收礼者收益
|
||||
Lucky24GiftConfig config = sendService.getConfig();
|
||||
Lucky24GiftConfig partitionConfig = config.getRatioByPartitionId(giftMessage.getPartitionId());
|
||||
SuperLuckyGiftIncomeAllot receiverIncomeAllot = incomeAllotService.calculate(partitionConfig, gift, giftMessage.getGiftNum(), Collections.singletonList(record.getReceiverUid()));
|
||||
superLuckyGiftSendService.syncSettlement(giftMessage.getUid(), gift, giftMessage.getGiftNum(), giftMessage.getGiftNum(), room, receiverIncomeAllot, createTime);
|
||||
|
||||
logger.info("【处理lucky24 mq】 收礼收益已发放 messId: {} incomeAllot: {}", giftMessage.getMessId(), JSON.toJSONString(receiverIncomeAllot));
|
||||
|
||||
// 异步,报错不会触发mq重试
|
||||
if (!CollectionUtils.isEmpty(config.getFollowUidList()) && config.getFollowUidList().contains(record.getUid())){
|
||||
robotMsgService.pushFollowUser(record.getUid(), record.getReceiverUid(), record.getRoomUid());
|
||||
}
|
||||
|
||||
// 异步,报错不会触发mq重试
|
||||
lucky24SendWeekRankService.updateRank(record);
|
||||
|
||||
// 删除该标识,表示消息已经消费过
|
||||
recordMessMap.fastRemove(giftMessage.getMessId());
|
||||
}
|
||||
|
||||
private Lucky24Record insertRecord(Lucky24Message giftMessage) {
|
||||
Lucky24Record record = new Lucky24Record();
|
||||
|
||||
if (null == giftMessage.getId()){
|
||||
giftMessage.setId(identifierGenerator.nextId( null).longValue());
|
||||
}
|
||||
|
||||
record.setId(giftMessage.getId());
|
||||
record.setMessId(giftMessage.getMessId());
|
||||
record.setPartitionId(giftMessage.getPartitionId());
|
||||
record.setUid(giftMessage.getUid());
|
||||
@@ -127,4 +159,47 @@ public class Lucky24MessageService extends BaseService {
|
||||
return record;
|
||||
}
|
||||
|
||||
private Lucky24Record insertRecordV2(Lucky24Message giftMessage) {
|
||||
Lucky24Record record = new Lucky24Record();
|
||||
|
||||
if (null == giftMessage.getId()){
|
||||
giftMessage.setId(identifierGenerator.nextId( null).longValue());
|
||||
}
|
||||
|
||||
record.setId(giftMessage.getId());
|
||||
record.setMessId(giftMessage.getMessId());
|
||||
record.setPartitionId(giftMessage.getPartitionId());
|
||||
record.setUid(giftMessage.getUid());
|
||||
record.setReceiverUid(giftMessage.getReceiverUid());
|
||||
record.setRoomUid(giftMessage.getRoomUid());
|
||||
record.setGiftId(giftMessage.getGiftId());
|
||||
record.setGiftGoldPrice(giftMessage.getGiftGoldPrice());
|
||||
record.setGiftNum(giftMessage.getGiftNum());
|
||||
record.setPoolType(giftMessage.getPoolType());
|
||||
record.setPoolId(giftMessage.getPoolId());
|
||||
|
||||
if (null == giftMessage.getPoolType() && null != giftMessage.getPoolId()){
|
||||
Lucky24Pool pool = poolMapper.selectById(giftMessage.getPoolId());
|
||||
if (null != pool){
|
||||
record.setPoolType(pool.getType());
|
||||
}
|
||||
}
|
||||
|
||||
record.setIsSupplement(giftMessage.getIsSupplement());
|
||||
record.setDrawMultiple(giftMessage.getDrawMultiple());
|
||||
record.setAfterMultiple(giftMessage.getAfterMultiple());
|
||||
record.setWinGoldNum(giftMessage.getWinGoldNum());
|
||||
record.setCreateTime(new Date(giftMessage.getCreateTime()));
|
||||
record.setStockResult(giftMessage.getStockResult());
|
||||
|
||||
recordService.insertRecordIgnore(record);
|
||||
|
||||
return record;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
recordMessMap = redissonClient.getMap(RedisKey.lucky_24_record_message.getKey());
|
||||
}
|
||||
|
||||
}
|
||||
|
@@ -1,12 +1,15 @@
|
||||
package com.accompany.business.service.lucky;
|
||||
|
||||
import com.accompany.business.constant.Lucky24PoolTypeEnum;
|
||||
import com.accompany.business.message.BillMessage;
|
||||
import com.accompany.business.model.Gift;
|
||||
import com.accompany.business.mybatismapper.lucky.Lucky24StatMapper;
|
||||
import com.accompany.common.status.BusiStatus;
|
||||
import com.accompany.common.utils.DateTimeUtil;
|
||||
import com.accompany.core.exception.ServiceException;
|
||||
import com.accompany.core.model.Users;
|
||||
import com.accompany.sharding.mapper.Lucky24RecordMapper;
|
||||
import com.accompany.sharding.model.BillRecord;
|
||||
import com.accompany.sharding.model.Lucky24Record;
|
||||
import com.accompany.sharding.vo.Lucky24PersonalStat;
|
||||
import com.accompany.sharding.vo.Lucky24PlatformStat;
|
||||
@@ -16,6 +19,7 @@ import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import com.google.common.collect.Lists;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.dao.DuplicateKeyException;
|
||||
import org.springframework.stereotype.Service;
|
||||
@@ -61,6 +65,18 @@ public class Lucky24RecordService extends ServiceImpl<Lucky24RecordMapper, Lucky
|
||||
return record;
|
||||
}
|
||||
|
||||
public void insertRecordIgnore(Lucky24Record record) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
int insertRow = baseMapper.insertIgnore(record);
|
||||
|
||||
long endTime = System.currentTimeMillis();
|
||||
|
||||
log.info("insertLucky242RecordIgnore row {} performance - insert: {}ms",
|
||||
insertRow,
|
||||
endTime - startTime);
|
||||
}
|
||||
|
||||
public void insertRecord(Lucky24Record record) {
|
||||
for (int i = 0; i < 3; i++) {
|
||||
try {
|
||||
|
@@ -32,6 +32,9 @@ public interface MqConstant {
|
||||
String LUCKY_24_TOPIC = "lucky_24_topic";
|
||||
String LUCKY_24_CONSUME_GROUP = "lucky_24_consume_group";
|
||||
|
||||
String LUCKY_24_V2_TOPIC = "lucky_24_v2_topic";
|
||||
String LUCKY_24_V2_CONSUME_GROUP = "lucky_24_v2_consume_group";
|
||||
|
||||
String BILL_RECORD_TOPIC = "bill_record_topic";
|
||||
String BILL_RECORD_CONSUME_GROUP = "bill_record_consume_group";
|
||||
|
||||
|
@@ -22,7 +22,7 @@ public class Lucky24MessageConsumer extends AbstractMessageListener<Lucky24Messa
|
||||
@Override
|
||||
public void onMessage(Lucky24Message giftMessage) {
|
||||
log.info("onMessage lucky24Message: {}", giftMessage.toString());
|
||||
lucky24MessageService.handleGiftMessage(giftMessage);
|
||||
lucky24MessageService.handleMessage(giftMessage);
|
||||
}
|
||||
|
||||
}
|
||||
|
@@ -0,0 +1,28 @@
|
||||
package com.accompany.mq.consumer;
|
||||
|
||||
import com.accompany.business.message.Lucky24Message;
|
||||
import com.accompany.business.service.gift.Lucky24MessageService;
|
||||
import com.accompany.mq.constant.MqConstant;
|
||||
import com.accompany.mq.listener.AbstractMessageListener;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@ConditionalOnProperty(name = "spring.application.name", havingValue = "web")
|
||||
@RocketMQMessageListener(topic = MqConstant.LUCKY_24_V2_TOPIC, consumerGroup = MqConstant.LUCKY_24_V2_CONSUME_GROUP)
|
||||
public class Lucky24MessageV2Consumer extends AbstractMessageListener<Lucky24Message> {
|
||||
|
||||
@Autowired
|
||||
private Lucky24MessageService lucky24MessageService;
|
||||
|
||||
@Override
|
||||
public void onMessage(Lucky24Message giftMessage) {
|
||||
log.info("onMessage lucky24MessageV2: {}", giftMessage.toString());
|
||||
lucky24MessageService.handleMessageV2(giftMessage);
|
||||
}
|
||||
|
||||
}
|
@@ -24,7 +24,7 @@ public class BillTask extends BaseTask {
|
||||
@Scheduled(cron = "0 */5 * * * ?")
|
||||
public void retryBillQueue() {
|
||||
log.info("retryBillQueue start ...");
|
||||
Map<String, BillMessage> map = billMessageService.getRecordMessMap().randomEntries(10000);
|
||||
Map<String, BillMessage> map = billMessageService.getRecordMessMap().randomEntries(2000);
|
||||
if (CollectionUtils.isEmpty(map)) {
|
||||
return;
|
||||
}
|
||||
|
@@ -13,6 +13,7 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.time.Duration;
|
||||
@@ -56,7 +57,34 @@ public class Lucky24Task {
|
||||
String val = entry.getValue();
|
||||
Lucky24Message giftMessage = JSON.parseObject(val, Lucky24Message.class);
|
||||
if (curTime - giftMessage.getCreateTime() > gapTime) {
|
||||
lucky24MessageService.handleGiftMessage(giftMessage);
|
||||
lucky24MessageService.handleMessage(giftMessage);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("retryLucky24Queue error", e);
|
||||
}
|
||||
});
|
||||
log.info("retryLucky24Queue end ...");
|
||||
}
|
||||
|
||||
/**
|
||||
* 重新消费队列的消息
|
||||
*/
|
||||
@Scheduled(cron = "0 */5 * * * ?")
|
||||
public void retryLucky24QueueV2() {
|
||||
log.info("retryLucky24Queue start ...");
|
||||
Map<String, Lucky24Message> map = lucky24MessageService.getRecordMessMap().randomEntries(2000);
|
||||
if (CollectionUtils.isEmpty(map)) {
|
||||
return;
|
||||
}
|
||||
long curTime = System.currentTimeMillis();
|
||||
long gapTime = 1000 * 60 * 10; // 十分钟内没被消费
|
||||
|
||||
map.entrySet().parallelStream().forEach(entry -> {
|
||||
try {
|
||||
String messId = entry.getKey();
|
||||
Lucky24Message giftMessage = entry.getValue();
|
||||
if (curTime - giftMessage.getCreateTime() > gapTime) {
|
||||
lucky24MessageService.handleMessageV2(giftMessage);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("retryLucky24Queue error", e);
|
||||
@@ -105,10 +133,4 @@ public class Lucky24Task {
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
Date abc = DateTimeUtil.convertStrToDate("2024-12-18 05:00:00", DateTimeUtil.DEFAULT_DATETIME_PATTERN);
|
||||
ZonedDateTime now = abc.toInstant().atZone(ZoneId.of("Asia/Riyadh"));
|
||||
long zoneIdHour = Duration.between(abc.toInstant(), DateTimeUtil.converLocalDateTimeToDate(now.toLocalDateTime()).toInstant()).toHours();
|
||||
System.out.println(now + "_" + DateTimeUtil.convertDate(abc, DateTimeUtil.DEFAULT_DATETIME_PATTERN) + "_" + zoneIdHour);
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user