账单-雪花主键-无锁化唯一主键插入
This commit is contained in:
@@ -21,10 +21,8 @@ public interface BillRecordMapper extends BaseMapper<BillRecord> {
|
||||
|
||||
/**
|
||||
* 批量插入账单记录,使用 INSERT IGNORE 忽略重复记录
|
||||
* @param billRecords 账单记录列表
|
||||
* @return 插入的记录数
|
||||
*/
|
||||
int insertIgnore(@Param("billRecords") List<BillRecord> billRecords);
|
||||
int insertIgnore(@Param("record") BillRecord billRecord);
|
||||
|
||||
Long getGiftSumByGift();
|
||||
|
||||
|
@@ -15,49 +15,47 @@
|
||||
|
||||
<insert id="insertIgnore">
|
||||
INSERT IGNORE INTO bill_record (
|
||||
bill_id,
|
||||
uid,
|
||||
partition_id,
|
||||
target_uid,
|
||||
room_uid,
|
||||
bill_type,
|
||||
obj_id,
|
||||
obj_type,
|
||||
gift_id,
|
||||
gift_num,
|
||||
gift_total_gold_num,
|
||||
currency,
|
||||
before_amount,
|
||||
amount,
|
||||
actual_amount,
|
||||
after_amount,
|
||||
create_time,
|
||||
remark,
|
||||
mess_id
|
||||
bill_id,
|
||||
uid,
|
||||
partition_id,
|
||||
target_uid,
|
||||
room_uid,
|
||||
bill_type,
|
||||
obj_id,
|
||||
obj_type,
|
||||
gift_id,
|
||||
gift_num,
|
||||
gift_total_gold_num,
|
||||
currency,
|
||||
before_amount,
|
||||
amount,
|
||||
actual_amount,
|
||||
after_amount,
|
||||
create_time,
|
||||
remark,
|
||||
mess_id
|
||||
) VALUES
|
||||
<foreach collection="billRecords" item="record" separator=",">
|
||||
(
|
||||
#{record.billId},
|
||||
#{record.uid},
|
||||
#{record.partitionId},
|
||||
#{record.targetUid},
|
||||
#{record.roomUid},
|
||||
#{record.billType},
|
||||
#{record.objId},
|
||||
#{record.objType},
|
||||
#{record.giftId},
|
||||
#{record.giftNum},
|
||||
#{record.giftTotalGoldNum},
|
||||
#{record.currency},
|
||||
#{record.beforeAmount},
|
||||
#{record.amount},
|
||||
#{record.actualAmount},
|
||||
#{record.afterAmount},
|
||||
#{record.createTime},
|
||||
#{record.remark},
|
||||
#{record.messId}
|
||||
)
|
||||
</foreach>
|
||||
(
|
||||
#{record.billId},
|
||||
#{record.uid},
|
||||
#{record.partitionId},
|
||||
#{record.targetUid},
|
||||
#{record.roomUid},
|
||||
#{record.billType},
|
||||
#{record.objId},
|
||||
#{record.objType},
|
||||
#{record.giftId},
|
||||
#{record.giftNum},
|
||||
#{record.giftTotalGoldNum},
|
||||
#{record.currency},
|
||||
#{record.beforeAmount},
|
||||
#{record.amount},
|
||||
#{record.actualAmount},
|
||||
#{record.afterAmount},
|
||||
#{record.createTime},
|
||||
#{record.remark},
|
||||
#{record.messId}
|
||||
)
|
||||
</insert>
|
||||
|
||||
<select id="getGiftSumByGift" resultType="long">
|
||||
|
@@ -91,29 +91,13 @@ public class BillMessageService implements InitializingBean {
|
||||
}
|
||||
}
|
||||
|
||||
public void handleBillMessage(List<BillMessage> billMessageList) {
|
||||
List<Long> uidList = billMessageList.stream().map(BillMessage::getUid).distinct().toList();
|
||||
Map<Long, Users> usersMap = usersService.getUsersMapByUids(uidList);
|
||||
public void handleBillMessageV2(BillMessage billMessage) {
|
||||
|
||||
List<BillRecord> poList = billMessageList.stream().map(billMessage -> {
|
||||
BillRecord billRecord = new BillRecord();
|
||||
BeanUtils.copyProperties(billMessage, billRecord);
|
||||
Users u = usersMap.get(billMessage.getUid());
|
||||
if (null != u){
|
||||
billRecord.setPartitionId(u.getPartitionId());
|
||||
}
|
||||
BillRecord billRecord = insertBillRecordIgnore(billMessage);
|
||||
log.info("【处理账单mq】 billRecord 插入成功 id:{} messId: {} mess:{}",
|
||||
billRecord.getBillId(), billMessage.getMessId(), JSON.toJSONString(billMessage));
|
||||
|
||||
if (null == billRecord.getBillId()){
|
||||
billRecord.setBillId(DefaultIdentifierGenerator.getInstance().nextId(null));
|
||||
}
|
||||
|
||||
return billRecord;
|
||||
}).toList();
|
||||
|
||||
billRecordMapper.insertIgnore(poList);
|
||||
|
||||
String[] messIdList = billMessageList.stream().map(BillMessage::getMessId).toArray(String[]::new);
|
||||
recordMessMap.fastRemove(messIdList);
|
||||
recordMessMap.fastRemove(billMessage.getMessId());
|
||||
}
|
||||
|
||||
private BillRecord insertBillRecord(BillMessage billMessage) {
|
||||
@@ -139,6 +123,37 @@ public class BillMessageService implements InitializingBean {
|
||||
throw new ServiceException(BusiStatus.SERVERBUSY);
|
||||
}
|
||||
|
||||
private BillRecord insertBillRecordIgnore(BillMessage billMessage) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
long copyPropertiesTime = 0;
|
||||
long getUserTime = 0;
|
||||
long insertTime = 0;
|
||||
|
||||
BillRecord billRecord = new BillRecord();
|
||||
BeanUtils.copyProperties(billMessage, billRecord);
|
||||
copyPropertiesTime = System.currentTimeMillis();
|
||||
|
||||
Users u = usersService.getUsersByUid(billMessage.getUid());
|
||||
getUserTime = System.currentTimeMillis();
|
||||
if (null != u){
|
||||
billRecord.setPartitionId(u.getPartitionId());
|
||||
}
|
||||
|
||||
int insertRow = billRecordMapper.insertIgnore(billRecord);
|
||||
insertTime = System.currentTimeMillis();
|
||||
|
||||
long endTime = System.currentTimeMillis();
|
||||
|
||||
log.info("insertBillRecordIgnore row {} performance - copy: {}ms, getUser: {}ms, insert: {}ms, total: {}ms",
|
||||
insertRow,
|
||||
copyPropertiesTime - startTime,
|
||||
getUserTime - copyPropertiesTime,
|
||||
insertTime - getUserTime,
|
||||
endTime - startTime);
|
||||
|
||||
return billRecord;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
recordMessMap = redissonClient.getMap(RedisKey.bill_record_message.getKey());
|
||||
|
@@ -29,7 +29,7 @@ public class RocketMQService {
|
||||
* 送消息,发送到MQ
|
||||
*/
|
||||
public void sendBillRecordMessage(BillMessage billMessage) {
|
||||
mqMessageProducer.send(MqConstant.BILL_RECORD_TOPIC, billMessage,
|
||||
mqMessageProducer.send(MqConstant.BILL_RECORD_V2_TOPIC, billMessage,
|
||||
sendResult -> log.info("sendBillRecordMessage success message: {} queue {}", JSON.toJSONString(billMessage), sendResult.getMessageQueue().getQueueId()),
|
||||
throwable -> log.error("sendBillRecordMessage fail message: {}", JSON.toJSONString(billMessage), throwable));
|
||||
}
|
||||
|
@@ -35,6 +35,9 @@ public interface MqConstant {
|
||||
String BILL_RECORD_TOPIC = "bill_record_topic";
|
||||
String BILL_RECORD_CONSUME_GROUP = "bill_record_consume_group";
|
||||
|
||||
String BILL_RECORD_V2_TOPIC = "bill_record_v2_topic";
|
||||
String BILL_RECORD_V2_CONSUME_GROUP = "bill_record_v2_consume_group";
|
||||
|
||||
String BRAVO_TOPIC = "bravo_topic";
|
||||
String BRAVO_CONSUME_GROUP = "bravo_consume_group";
|
||||
|
||||
|
@@ -5,7 +5,6 @@ import com.accompany.business.service.gift.BillMessageService;
|
||||
import com.accompany.mq.constant.MqConstant;
|
||||
import com.accompany.mq.listener.AbstractMessageListener;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
|
@@ -0,0 +1,28 @@
|
||||
package com.accompany.mq.consumer;
|
||||
|
||||
import com.accompany.business.message.BillMessage;
|
||||
import com.accompany.business.service.gift.BillMessageService;
|
||||
import com.accompany.mq.constant.MqConstant;
|
||||
import com.accompany.mq.listener.AbstractMessageListener;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@ConditionalOnProperty(name = "spring.application.name", havingValue = "web")
|
||||
@RocketMQMessageListener(topic = MqConstant.BILL_RECORD_V2_TOPIC, consumerGroup = MqConstant.BILL_RECORD_V2_CONSUME_GROUP)
|
||||
public class BillMessageV2Consumer extends AbstractMessageListener<BillMessage> {
|
||||
|
||||
@Autowired
|
||||
private BillMessageService billMessageService;
|
||||
|
||||
@Override
|
||||
public void onMessage(BillMessage billMessage) {
|
||||
log.info("onMessage billMessage v2: {}", billMessage.toString());
|
||||
billMessageService.handleBillMessageV2(billMessage);
|
||||
}
|
||||
|
||||
}
|
Reference in New Issue
Block a user