rocketmq-AbstractMessageListener去掉通用分布式锁
This commit is contained in:
@@ -11,6 +11,7 @@ import com.accompany.business.activity.service.ActTaskRewardService;
|
||||
import com.accompany.business.activity.service.ActUserTaskService;
|
||||
import com.accompany.business.activity.strategy.ActRewardFactory;
|
||||
import com.accompany.common.redis.RedisKey;
|
||||
import com.accompany.core.service.common.JedisService;
|
||||
import com.accompany.mq.constant.MqConstant;
|
||||
import com.accompany.mq.listener.AbstractMessageListener;
|
||||
import com.accompany.mq.model.ActTaskRewardMqMessage;
|
||||
@@ -44,6 +45,8 @@ public class ActTaskRewardConsumer extends AbstractMessageListener<ActTaskReward
|
||||
|
||||
@Resource(name = "async-executor")
|
||||
private ThreadPoolExecutor asyncExecutor;
|
||||
@Autowired
|
||||
private JedisService jedisService;
|
||||
|
||||
@Override
|
||||
protected void onMessage(ActTaskRewardMqMessage object) {
|
||||
@@ -86,8 +89,4 @@ public class ActTaskRewardConsumer extends AbstractMessageListener<ActTaskReward
|
||||
actUserTaskService.updateById(actUserTask);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RedisKey mqLock() {
|
||||
return RedisKey.mq_status;
|
||||
}
|
||||
}
|
||||
|
@@ -16,6 +16,7 @@ import com.accompany.business.activity.service.ActUserTaskService;
|
||||
import com.accompany.common.constant.Constant;
|
||||
import com.accompany.common.redis.RedisKey;
|
||||
import com.accompany.core.base.SpringContextHolder;
|
||||
import com.accompany.core.service.common.JedisLockService;
|
||||
import com.accompany.mq.constant.MqConstant;
|
||||
import com.accompany.mq.listener.AbstractMessageListener;
|
||||
import com.accompany.mq.model.ActTaskRewardMqMessage;
|
||||
@@ -58,9 +59,11 @@ public class ActUserTaskConsumer extends AbstractMessageListener<ActUserTaskMqMe
|
||||
|
||||
@Autowired
|
||||
private MQMessageProducer mqMessageProducer;
|
||||
@Autowired
|
||||
private JedisLockService jedisLockService;
|
||||
|
||||
@Override
|
||||
protected void onMessage(ActUserTaskMqMessage object) throws Exception {
|
||||
protected void onMessage(ActUserTaskMqMessage object) {
|
||||
Long sendUid = object.getSendUid();
|
||||
Long receiveUid = object.getReceiveUid();
|
||||
Date messTime = new Date(object.getMessTime());
|
||||
|
@@ -1,6 +1,5 @@
|
||||
package com.accompany.business.event.listener.charge;
|
||||
|
||||
import com.accompany.business.service.mycard.MyCardBizService;
|
||||
import com.accompany.mq.constant.MqConstant;
|
||||
import com.accompany.mq.model.ChargeMqMessage;
|
||||
import com.accompany.mq.producer.MQMessageProducer;
|
||||
@@ -21,9 +20,6 @@ import org.springframework.stereotype.Component;
|
||||
@Component
|
||||
public class MyCardChargeListener implements ApplicationListener<ChargeEvent> {
|
||||
|
||||
@Autowired
|
||||
private MyCardBizService myCardBizService;
|
||||
|
||||
@Autowired
|
||||
private MQMessageProducer mqMessageProducer;
|
||||
|
||||
|
@@ -76,7 +76,7 @@
|
||||
<tencentcloud-sdk-java.version>3.1.781</tencentcloud-sdk-java.version>
|
||||
<tencentcloud-cos-sdk-java.version>5.6.253</tencentcloud-cos-sdk-java.version>
|
||||
<tencentcloud-cos-sts-sdk-java.version>3.1.1</tencentcloud-cos-sts-sdk-java.version>
|
||||
<rocketmq-spring-boot.version>2.3.3</rocketmq-spring-boot.version>
|
||||
<rocketmq-spring-boot.version>2.3.4</rocketmq-spring-boot.version>
|
||||
<kaptcha.version>2.3.2</kaptcha.version>
|
||||
<hippo4j-core.version>1.5.0</hippo4j-core.version>
|
||||
<android-json.version>0.0.20131108.vaadin1</android-json.version>
|
||||
|
@@ -1,17 +1,14 @@
|
||||
package com.accompany.mq.listener;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.accompany.common.redis.RedisKey;
|
||||
import com.accompany.core.service.common.JedisLockService;
|
||||
import com.accompany.core.service.common.JedisService;
|
||||
import com.accompany.common.status.BusiStatus;
|
||||
import com.accompany.core.exception.ServiceException;
|
||||
import com.accompany.mq.model.BaseMqMessage;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import java.lang.reflect.ParameterizedType;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* @author: liaozetao
|
||||
@@ -21,44 +18,21 @@ import java.util.UUID;
|
||||
@Slf4j
|
||||
public abstract class AbstractMessageListener<T extends BaseMqMessage> implements RocketMQListener<String> {
|
||||
|
||||
private static final int MQ_LOCK_SECONDS = 30 * 60;
|
||||
|
||||
@Autowired
|
||||
protected JedisService jedisService;
|
||||
|
||||
@Autowired
|
||||
protected JedisLockService jedisLockService;
|
||||
|
||||
@Override
|
||||
public void onMessage(String message) {
|
||||
try {
|
||||
//log.info("====mq message start====");
|
||||
//log.info("text message : {}", message);
|
||||
if (!message.startsWith(StrUtil.DELIM_START) || !message.endsWith(StrUtil.DELIM_END)) {
|
||||
return;
|
||||
}
|
||||
T mqMessage = JSONObject.parseObject(message)
|
||||
.toJavaObject(((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[0]);
|
||||
if (mqMessage == null) {
|
||||
return;
|
||||
}
|
||||
//String messId = mqMessage.getMessId();
|
||||
String messId = UUID.randomUUID().toString();
|
||||
//防止消息被重复消费
|
||||
RedisKey mqLock = mqLock();
|
||||
if (mqLock != null && !jedisService.setnx(mqLock.getKey(messId), Boolean.TRUE.toString(), MQ_LOCK_SECONDS)) {
|
||||
log.error("mq lock : {}, message had handle, msg : {}", mqLock.getKey(messId), message);
|
||||
return;
|
||||
}
|
||||
onMessage(mqMessage);
|
||||
} catch (Exception e) {
|
||||
log.error(e.getMessage(), e);
|
||||
//log.info("====mq message start====");
|
||||
//log.info("text message : {}", message);
|
||||
if (!message.startsWith(StrUtil.DELIM_START) || !message.endsWith(StrUtil.DELIM_END)) {
|
||||
return;
|
||||
}
|
||||
T mqMessage = JSONObject.parseObject(message)
|
||||
.toJavaObject(((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[0]);
|
||||
if (mqMessage == null) {
|
||||
throw new ServiceException(BusiStatus.PARAMERROR);
|
||||
}
|
||||
onMessage(mqMessage);
|
||||
}
|
||||
|
||||
protected abstract void onMessage(T object) throws Exception;
|
||||
protected abstract void onMessage(T object);
|
||||
|
||||
protected RedisKey mqLock() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
@@ -4,6 +4,7 @@ import cn.hutool.core.util.StrUtil;
|
||||
import com.accompany.business.service.mycard.MyCardBizService;
|
||||
import com.accompany.common.constant.Constant;
|
||||
import com.accompany.common.redis.RedisKey;
|
||||
import com.accompany.core.service.common.JedisService;
|
||||
import com.accompany.mq.constant.MqConstant;
|
||||
import com.accompany.mq.listener.AbstractMessageListener;
|
||||
import com.accompany.mq.model.ChargeMqMessage;
|
||||
@@ -36,9 +37,11 @@ public class MyCardChargeMessageConsumer extends AbstractMessageListener<ChargeM
|
||||
|
||||
@Autowired
|
||||
private MQMessageProducer mqMessageProducer;
|
||||
@Autowired
|
||||
private JedisService jedisService;
|
||||
|
||||
@Override
|
||||
protected void onMessage(ChargeMqMessage object) throws Exception {
|
||||
protected void onMessage(ChargeMqMessage object) {
|
||||
String chargeRecordId = object.getChargeRecordId();
|
||||
ChargeRecord chargeRecord = chargeRecordService.getChargeRecordById(chargeRecordId);
|
||||
if (chargeRecord == null) {
|
||||
|
Reference in New Issue
Block a user