rocketmq-替换除开奖和旧线性奖池外

This commit is contained in:
2023-03-14 20:31:10 +08:00
parent cb28b5974a
commit dea69418d5
39 changed files with 718 additions and 142 deletions

View File

@@ -57,8 +57,6 @@ public class JmsConfig {
public final static String PAY_FINISH_QUEUE = "pay-finish-queue";
public final static String AD_PLATFORM_USER_LOGIN_MSG_QUEUE = "ad_platform_user_login_msg_queue";
public final static String LINEARLY_POOL_DEFAULT_QUEUE = "linearly-pool-draw-queue";
@@ -306,16 +304,6 @@ public class JmsConfig {
return factory;
}
@Bean
public Queue adPlatfromUserLoginMsgQueue() { return new ActiveMQQueue(AD_PLATFORM_USER_LOGIN_MSG_QUEUE); }
@Bean("adPlatformLoginMsgJmsTemplate")
public JmsTemplate adPlatformLoginMsgJmsTemplate(@Qualifier("myConnectionFactory")ConnectionFactory activeMQConnectionFactory) {
JmsTemplate template = new JmsTemplate(activeMQConnectionFactory);
template.setDefaultDestination(adPlatfromUserLoginMsgQueue());
return template;
}
@Bean("adPlatformUserLoginMsgContainer")
public DefaultJmsListenerContainerFactory adPlatformUserLoginMsgContainer() {

View File

@@ -48,6 +48,13 @@ activemq:
maxConnections: 50
idleTimeout: 30000
## rocketmq 配置
rocketmq:
name-server: 128.1.134.148:9876
producer:
group: peko-group
sendMessageTimeout: 300000
## ES配置
elasticsearch:
clusterName: elasticsearch
@@ -56,4 +63,4 @@ elasticsearch:
username: elastic
password: xuanyin@es123
roomIndex: yinyou_dev_room
userIndex: yinyou_dev_users_202204121516
userIndex: yinyou_dev_users_202204121516

View File

@@ -56,8 +56,6 @@ public class JmsConfig {
public final static String PAY_FINISH_QUEUE = "pay-finish-queue";
public final static String AD_PLATFORM_USER_LOGIN_MSG_QUEUE = "ad_platform_user_login_msg_queue";
public final static String LINEARLY_POOL_DEFAULT_QUEUE = "linearly-pool-draw-queue";
@@ -308,17 +306,6 @@ public class JmsConfig {
return factory;
}
@Bean
public Queue adPlatfromUserLoginMsgQueue() { return new ActiveMQQueue(AD_PLATFORM_USER_LOGIN_MSG_QUEUE); }
@Bean("adPlatformLoginMsgJmsTemplate")
public JmsTemplate adPlatformLoginMsgJmsTemplate(@Qualifier("myConnectionFactory")ConnectionFactory activeMQConnectionFactory) {
JmsTemplate template = new JmsTemplate(activeMQConnectionFactory);
template.setDefaultDestination(adPlatfromUserLoginMsgQueue());
return template;
}
@Bean("adPlatformUserLoginMsgContainer")
public DefaultJmsListenerContainerFactory adPlatformUserLoginMsgContainer() {
DefaultJmsListenerContainerFactory factory

View File

@@ -48,6 +48,13 @@ activemq:
maxConnections: 50
idleTimeout: 30000
## rocketmq 配置
rocketmq:
name-server: 128.1.134.148:9876
producer:
group: peko-group
sendMessageTimeout: 300000
## ES配置
elasticsearch:
clusterName: elasticsearch

View File

@@ -1779,8 +1779,9 @@ public enum RedisKey {
valentine_cp_draw_lock,
label,
room_first_charge_window;
room_first_charge_window,
mq_user_first_login_status,
;

View File

@@ -1,4 +1,4 @@
package com.accompany.business.mq;
package com.accompany.business.event.listener;
import com.accompany.business.event.GiftMessageEvent;
import com.accompany.business.message.GiftMessage;

View File

@@ -3,6 +3,7 @@ package com.accompany.business.service;
import com.accompany.business.event.ChargeSuccessEvent;
import com.accompany.business.message.PayFinishMessage;
import com.accompany.business.service.mq.ActiveMQService;
import com.accompany.business.service.mq.RocketMQService;
import com.accompany.business.service.purse.UserPurseService;
import com.accompany.business.service.record.BillRecordService;
import com.accompany.business.service.user.UsersService;
@@ -66,6 +67,8 @@ public class ChargeRecordUpdateService extends BaseService {
@Autowired
private ActiveMQService activeMQService;
@Autowired
private RocketMQService rocketMQService;
@Autowired
private ApplicationContext applicationContext;
@@ -352,7 +355,8 @@ public class ChargeRecordUpdateService extends BaseService {
message.setMessTime(System.currentTimeMillis());
//保存一份到redis中
jedisService.hwrite(RedisKey.mq_pay_finish_status.getKey(), message.getMessId(), gson.toJson(chargeRecord));
activeMQService.sendPayFinishMessage(message);
//activeMQService.sendPayFinishMessage(message);
rocketMQService.sendPayFinishMessage(message);
}
/**

View File

@@ -3,6 +3,7 @@ package com.accompany.business.service;
import com.accompany.business.annotation.CancelUidTag;
import com.accompany.business.event.ChargeSuccessEvent;
import com.accompany.business.message.PayFinishMessage;
import com.accompany.business.service.mq.RocketMQService;
import com.accompany.common.utils.BeanUtil;
import com.accompany.core.service.base.BaseService;
import com.accompany.payment.dto.AreaChargeConfigDTO;
@@ -66,10 +67,6 @@ public class ChargeService extends BaseService {
private UsersService usersService;
@Autowired
private PayCenterService payCenterService;
@Resource(name = "bizExecutor")
private TaskExecutor bizExecutor;
@Autowired
private AliyunSmsService aliyunSmsService;
@Autowired
private UserPurseService userPurseService;
@Autowired
@@ -81,6 +78,8 @@ public class ChargeService extends BaseService {
@Autowired
private ActiveMQService activeMQService;
@Autowired
private RocketMQService rocketMQService;
@Autowired
private SysConfService sysConfService;
@Autowired
private LuckyTarotRecordService luckyTarotRecordService;
@@ -435,7 +434,8 @@ public class ChargeService extends BaseService {
message.setMessTime(System.currentTimeMillis());
//保存一份到redis中
jedisService.hwrite(RedisKey.mq_pay_finish_status.getKey(), message.getMessId(), gson.toJson(chargeRecord));
activeMQService.sendPayFinishMessage(message);
//activeMQService.sendPayFinishMessage(message);
rocketMQService.sendPayFinishMessage(message);
}

View File

@@ -7,6 +7,7 @@ import com.accompany.business.mybatismapper.ActivityAwardMapper;
import com.accompany.business.mybatismapper.ActivityPackItemMapper;
import com.accompany.business.mybatismapper.ActivityPackMapper;
import com.accompany.business.service.mq.ActiveMQService;
import com.accompany.business.service.mq.RocketMQService;
import com.accompany.business.service.purse.UserPurseService;
import com.accompany.business.service.user.UsersService;
import com.accompany.business.util.ReplaceDomainUtil;
@@ -66,6 +67,8 @@ public class ActivityPackService implements InitializingBean, BeanSelfAware {
@Autowired
private ActiveMQService activeMQService;
@Autowired
private RocketMQService rocketMQService;
@Autowired
private ActivityPackItemMapper activityPackItemMapper;
@Autowired
private UsersService usersService;
@@ -308,7 +311,8 @@ public class ActivityPackService implements InitializingBean, BeanSelfAware {
message.setPackType(packType);
// 缓存消息的消费状态,便于队列消息做幂等处理
jedisService.hwrite(RedisKey.mq_pack_status.getKey(), message.getMessId(), gson.toJson(message));
activeMQService.sendActivityPackMessage(message);
//activeMQService.sendActivityPackMessage(message);
rocketMQService.sendActivityPackMessage(message);
}
public ActivityPack getPackById(int packId){

View File

@@ -407,7 +407,7 @@ public class GiftSendService extends BaseService {
GiftMessage message = buildGiftMessage(sendUid, recvUid, roomUid, giftId, giftConsumeType, giftType, giftNum, goldNum, sendType, roomType, msg, giftSource, luckyBagGift);
// 缓存消息的消费状态,便于队列消息做幂等处理
jedisService.hwrite(RedisKey.mq_gift_status.getKey(), message.getMessId(), gson.toJson(message));
activeMQService.sendGiftMessage(message);
//activeMQService.sendGiftMessage(message);
rocketMQService.sendGiftMessage(message);
//通知观察者已经送出礼物
try {

View File

@@ -3,6 +3,7 @@ package com.accompany.business.service.gift;
import com.accompany.business.message.RadishGiftMessage;
import com.accompany.business.model.Gift;
import com.accompany.business.service.mq.ActiveMQService;
import com.accompany.business.service.mq.RocketMQService;
import com.accompany.business.service.purse.UserWalletService;
import com.accompany.business.service.room.RoomService;
import com.accompany.business.service.user.UserBackpackService;
@@ -44,9 +45,9 @@ public class RadishSendService extends BaseService {
@Autowired
private ActiveMQService activeMQService;
@Autowired
private UsersService usersService;
private RocketMQService rocketMQService;
@Autowired
private SysConfService sysConfService;
private UsersService usersService;
@Autowired
private UserBackpackService userBackpackService;
@Autowired
@@ -125,8 +126,8 @@ public class RadishSendService extends BaseService {
sendType, roomType, msg, giftSource);
// 缓存消息的消费状态,便于队列消息做幂等处理
jedisService.hwrite(RedisKey.mq_radish_gift_status.getKey(), message.getMessId(), gson.toJson(message));
activeMQService.sendRadishGiftMessage(message);
//activeMQService.sendRadishGiftMessage(message);
rocketMQService.sendRadishGiftMessage(message);
}
private RadishGiftMessage buildRadishGiftMessage(Long sendUid, Long receiveUid, Long roomUid, Integer giftId, Integer giftNum,

View File

@@ -5,6 +5,7 @@ import com.accompany.business.model.MusicLibrary;
import com.accompany.business.model.MusicLibraryExample;
import com.accompany.business.service.ErBanNetEaseService;
import com.accompany.business.service.mq.ActiveMQService;
import com.accompany.business.service.mq.RocketMQService;
import com.accompany.business.service.room.RoomService;
import com.accompany.business.service.user.UsersService;
import com.accompany.business.util.ReplaceDomainUtil;
@@ -62,6 +63,8 @@ public class UserChooseMusicService extends BaseService {
@Autowired
private ActiveMQService activeMQService;
@Autowired
private RocketMQService rocketMQService;
@Autowired
private JedisService jedisService;
//点歌最大上限
private final int MAX_CHOOSE_MUSIC_COUNT = 50;
@@ -432,8 +435,9 @@ public class UserChooseMusicService extends BaseService {
message.setMessTime(System.currentTimeMillis());
message.setRoomUid(roomUid);
message.setUid(uid);
activeMQService.sendCleanMusicDelayMessage(message);
//activeMQService.sendCleanMusicDelayMessage(message);
jedisService.hwrite(RedisKey.room_clean_music_messId.getKey(), roomUid + "_" + uid, messId);
rocketMQService.sendCleanMusicDelayMessage(message);
}
}

View File

@@ -0,0 +1,35 @@
package com.accompany.business.service.mq;
public class RocketMQConstant {
public final static String GIFT_TOPIC = "gift";
public final static String GIFT_CONSUME_GROUP = "gift_consume_group";
public final static String CLEAN_MUSIC_TOPIC = "clean_music";
public final static String CLEAN_MUSIC_CONSUME_GROUP = "clean_music_consume_group";
public final static String ACTIVITY_PACK_TOPIC = "activity_pack";
public final static String ACTIVITY_PACK_CONSUME_GROUP = "activity_pack_consume_group";
public final static String SIGN_DRAW_GOLD_TOPIC = "sign_draw_gold";
public final static String SIGN_DRAW_GOLD_CONSUME_GROUP = "sign_draw_gold_consume_group";
public final static String SIGN_TOPIC = "sign";
public final static String SIGN_CONSUME_GROUP = "sign_consume_group";
public final static String RADISH_GIFT_TOPIC = "radish_gift";
public final static String RADISH_GIFT_CONSUME_GROUP = "radish_gift_consume_group";
public final static String PAY_FINISH_TOPIC = "pay_finish";
public final static String PAY_FINISH_CONSUME_GROUP = "pay_finish_consume_group";
public final static String USER_FIRST_LOGIN_TOPIC = "user_first_login";
public final static String USER_FIRST_LOGIN_CONSUME_GROUP = "user_first_login_consume_group";
public final static String VOICE_LIKE_TOPIC = "voice_like";
public final static String VOICE_LIKE_CONSUME_GROUP = "voice_like_consume_group";
public final static String YIDUN_TEXT_ANTI_TOPIC = "yidun_text_anti";
public final static String YIDUN_TEXT_ANTI_CONSUME_GROUP = "yidun_text_anti_consume_group";
}

View File

@@ -1,6 +1,8 @@
package com.accompany.business.service.mq;
import com.accompany.business.message.GiftMessage;
import com.accompany.business.message.*;
import com.accompany.business.message.linearlypool.LinearlyPoolPrizeMessage;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
@@ -23,7 +25,93 @@ public class RocketMQService {
*/
public void sendGiftMessage(GiftMessage giftMessage) {
log.info("sendGiftMessage gift message: {}", giftMessage);
rocketMQTemplate.convertAndSend("gift-queue",giftMessage);
rocketMQTemplate.convertAndSend(RocketMQConstant.GIFT_TOPIC, giftMessage);
}
/**
* 发送开箱子中奖消息
*/
public void sendOpenBoxMessage(BoxPrizeMessage message, String queueName) {
log.info("sendOpenBoxMessage on {}, message: {}", queueName, JSON.toJSONString(message));
rocketMQTemplate.convertAndSend(queueName,message);
}
public void sendLinearlyPoolDrawMessage(LinearlyPoolPrizeMessage message, String queueName) {
log.info("sendLinearlyPoolDrawMessage on {}, message: {}", queueName, JSON.toJSONString(message));
rocketMQTemplate.convertAndSend(queueName,message);
}
/**
* 发送清歌的延迟消息
* @param message
*/
public void sendCleanMusicDelayMessage(CleanMusicDelayMessage message) {
log.info("sendCleanMusicDelayMessage message: {}", JSON.toJSONString(message));
rocketMQTemplate.convertAndSend(RocketMQConstant.CLEAN_MUSIC_TOPIC, message);
}
/**
* 活动礼包消息发送到MQ
*
* @param packMessage
*/
public void sendActivityPackMessage(ActivityPackMessage packMessage) {
log.info("sendGiftMessage gift message: {}", packMessage);
rocketMQTemplate.convertAndSend(RocketMQConstant.ACTIVITY_PACK_TOPIC, packMessage);
}
/**
* 发送签到瓜分金币中奖消息
* @param message
*/
public void sendSignDrawGoldMessage(SignDrawGoldMessage message) {
log.info("sendSignDrawGoldMessage message: {}", JSON.toJSONString(message));
rocketMQTemplate.convertAndSend(RocketMQConstant.SIGN_DRAW_GOLD_TOPIC, message);
}
/**
* 发送签到消息
* @param message
*/
public void sendSignMessage(SignMessage message) {
log.info("sendSignMessage message: {}", JSON.toJSONString(message));
rocketMQTemplate.convertAndSend(RocketMQConstant.SIGN_TOPIC, message);
}
/**
* 发送萝卜礼物消息
* @param message
*/
public void sendRadishGiftMessage(RadishGiftMessage message) {
log.info("sendRadishGiftMessage message: {}", JSON.toJSONString(message));
rocketMQTemplate.convertAndSend(RocketMQConstant.RADISH_GIFT_TOPIC, message);
}
/**
* 支付完成的消息发送到MQ
*
* @param message
*/
public void sendPayFinishMessage(PayFinishMessage message) {
log.info("send payFinish message: {}", JSON.toJSONString(message));
rocketMQTemplate.convertAndSend(RocketMQConstant.PAY_FINISH_TOPIC, message);
}
/**
* 声音匹配 喜欢/不喜欢
* @param message
*/
public void sendVoiceLikeMessage(VoiceLikeMessage message) {
log.info("sendVoiceLikeMessage message: {}", JSON.toJSONString(message));
rocketMQTemplate.convertAndSend(RocketMQConstant.VOICE_LIKE_TOPIC, message);
}
/**
* 发送易盾im反垃圾消息
* @param msg
*/
public void sendYidunIMTextAntiMsg(YidunIMAntiMessage msg) {
log.info("sendYidunIMTextAntiMsg message: {}", JSON.toJSONString(msg));
rocketMQTemplate.convertAndSend(RocketMQConstant.YIDUN_TEXT_ANTI_TOPIC, msg);
}
}

View File

@@ -0,0 +1,40 @@
package com.accompany.business.service.mq.listener;
import com.accompany.business.message.ActivityPackMessage;
import com.accompany.business.service.activity.ActivityPackMessageService;
import com.accompany.business.service.mq.RocketMQConstant;
import com.accompany.common.redis.RedisKey;
import com.accompany.common.utils.BlankUtil;
import com.accompany.core.service.common.JedisService;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* 礼包消息监听器
* @author xiaoyuyou
* @date 2018/9/5 11:03
*/
@Slf4j
@Service
@RocketMQMessageListener(topic = RocketMQConstant.ACTIVITY_PACK_TOPIC, consumerGroup = RocketMQConstant.ACTIVITY_PACK_CONSUME_GROUP)
public class ActivityPackMessageMQListener implements RocketMQListener<ActivityPackMessage> {
@Autowired
private JedisService jedisService;
@Autowired
private ActivityPackMessageService activityPackMessageService;
public void onMessage(ActivityPackMessage packMessage) {
log.info("handle activity pack message {}", JSON.toJSONString(packMessage));
// 判断该消息是否已经消费过
String messStatus = jedisService.hget(RedisKey.mq_pack_status.getKey(), packMessage.getMessId());
if (BlankUtil.isBlank(messStatus)) {
return;
}
activityPackMessageService.handleActivityPackMessage(packMessage);
}
}

View File

@@ -0,0 +1,52 @@
package com.accompany.business.service.mq.listener;
import com.accompany.business.message.CleanMusicDelayMessage;
import com.accompany.business.service.ktv.UserChooseMusicService;
import com.accompany.business.service.mq.RocketMQConstant;
import com.accompany.business.service.user.UserInRoomService;
import com.accompany.business.vo.RoomVo;
import com.accompany.common.redis.RedisKey;
import com.accompany.core.service.common.JedisService;
import com.alibaba.fastjson.JSON;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* 清歌消息监听器
* @author xiaoyuyou
* @date 2018/9/5 11:03
*/
@Slf4j
@Service
@RocketMQMessageListener(topic = RocketMQConstant.CLEAN_MUSIC_TOPIC, consumerGroup = RocketMQConstant.CLEAN_MUSIC_CONSUME_GROUP)
public class CleanMusicDelayMessageMQListener implements RocketMQListener<CleanMusicDelayMessage> {
@Autowired
private UserChooseMusicService userChooseMusicService;
@Autowired
private UserInRoomService userInRoomService;
@Autowired
private JedisService jedisService;
@SneakyThrows
public void onMessage(CleanMusicDelayMessage cleanMusicMsg) {
log.info("clean music delay message {}", JSON.toJSONString(cleanMusicMsg));
Long roomUid = cleanMusicMsg.getRoomUid();
Long uid = cleanMusicMsg.getUid();
String messId = jedisService.hget(RedisKey.room_clean_music_messId.getKey(), roomUid + "_" + uid);
log.info("clean music listener exec, roomUid:{}, uid:{}, period:{}, redis messId:{}, current messId:{}",
roomUid, uid, System.currentTimeMillis() - cleanMusicMsg.getMessTime(), messId, cleanMusicMsg.getMessId());
if (StringUtils.isEmpty(messId) || !StringUtils.equals(messId, cleanMusicMsg.getMessId())){
return;
}
RoomVo roomVo = userInRoomService.getUserInRoomInfoCache(uid);
if (roomVo == null || roomVo.getUid().longValue() != roomUid.longValue()){
userChooseMusicService.deleteUserAllChooseMusic(roomUid, uid);
}
}
}

View File

@@ -0,0 +1,35 @@
package com.accompany.business.service.mq.listener;
import com.accompany.business.message.GiftMessage;
import com.accompany.business.service.gift.GiftMessageService;
import com.accompany.business.service.mq.RocketMQConstant;
import com.accompany.common.redis.RedisKey;
import com.accompany.common.utils.BlankUtil;
import com.accompany.core.service.common.JedisService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RocketMQMessageListener(topic = RocketMQConstant.GIFT_TOPIC, consumerGroup = RocketMQConstant.GIFT_CONSUME_GROUP)
public class GiftMessageMQListener implements RocketMQListener<GiftMessage> {
@Autowired
private JedisService jedisService;
@Autowired
private GiftMessageService giftMessageService;
public void onMessage(GiftMessage giftMessage) {
log.info("onMessage giftMessage: {}", giftMessage.toString());
// 判断该消息是否已经消费过
String messStatus = jedisService.hget(RedisKey.mq_gift_status.getKey(), giftMessage.getMessId());
if (BlankUtil.isBlank(messStatus)) {
return;
}
giftMessageService.handleGiftMessage(giftMessage);
}
}

View File

@@ -0,0 +1,49 @@
package com.accompany.business.service.mq.listener;
import com.accompany.business.message.PayFinishMessage;
import com.accompany.business.service.activity.ChargeActivityPackRecordService;
import com.accompany.business.service.mq.RocketMQConstant;
import com.accompany.common.redis.RedisKey;
import com.accompany.common.utils.BlankUtil;
import com.accompany.core.service.common.JedisService;
import com.accompany.payment.model.ChargeRecord;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* 处理支付完成后的一些操作
* 如:
* 1. 充值送礼包:充值完成后,判断用户的充值金额、次数等,并发放相应的礼包
*/
@Slf4j
@Service
@RocketMQMessageListener(topic = RocketMQConstant.PAY_FINISH_TOPIC, consumerGroup = RocketMQConstant.PAY_FINISH_CONSUME_GROUP)
public class PayFinishMessageMQListener implements RocketMQListener<PayFinishMessage> {
@Autowired
private JedisService jedisService;
@Autowired
private ChargeActivityPackRecordService chargeActivityPackRecordService;
public void onMessage(PayFinishMessage payFinishMessage) {
log.info("onMessage payFinishMessage: {}", JSON.toJSONString(payFinishMessage));
// 判断该消息是否已经消费过
String messStatus = jedisService.hget(RedisKey.mq_pay_finish_status.getKey(), payFinishMessage.getMessId());
if (BlankUtil.isBlank(messStatus)) {
log.info("消息已处理消息id:{}", payFinishMessage.getMessId());
return;
}
//处理具体逻辑
ChargeRecord chargeRecord = JSON.parseObject(messStatus, ChargeRecord.class);
//处理首充礼包
chargeActivityPackRecordService.handleFirstChargeActivityPack(chargeRecord);
//消费完成,删除该消息
jedisService.hdel(RedisKey.mq_pay_finish_status.getKey(), payFinishMessage.getMessId());
}
}

View File

@@ -0,0 +1,36 @@
package com.accompany.business.service.mq.listener;
import com.accompany.business.message.RadishGiftMessage;
import com.accompany.business.service.gift.RadishGiftMessageService;
import com.accompany.business.service.mq.RocketMQConstant;
import com.accompany.common.redis.RedisKey;
import com.accompany.common.utils.BlankUtil;
import com.accompany.core.service.common.JedisService;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RocketMQMessageListener(topic = RocketMQConstant.RADISH_GIFT_TOPIC, consumerGroup = RocketMQConstant.RADISH_GIFT_CONSUME_GROUP)
public class RadishGiftMessageMQListener implements RocketMQListener<RadishGiftMessage> {
@Autowired
private JedisService jedisService;
@Autowired
private RadishGiftMessageService radishGiftMessageService;
public void onMessage(RadishGiftMessage radishGiftMessage) {
log.info("onMessage radishGiftMessage: {}", radishGiftMessage.toString());
// 判断该消息是否已经消费过
String messStatus = jedisService.hget(RedisKey.mq_radish_gift_status.getKey(), radishGiftMessage.getMessId());
if (BlankUtil.isBlank(messStatus)) {
return;
}
radishGiftMessageService.handleRadishGiftMessage(radishGiftMessage);
}
}

View File

@@ -0,0 +1,44 @@
package com.accompany.business.service.mq.listener;
import com.accompany.business.message.SignDrawGoldMessage;
import com.accompany.business.service.mq.RocketMQConstant;
import com.accompany.business.service.signweb.SignDrawGoldService;
import com.accompany.common.redis.RedisKey;
import com.accompany.common.utils.BlankUtil;
import com.accompany.core.service.common.JedisService;
import com.alibaba.fastjson.JSON;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;
/**
* 瓜分金币
*/
@Slf4j
@Service
@RocketMQMessageListener(topic = RocketMQConstant.SIGN_DRAW_GOLD_TOPIC, consumerGroup = RocketMQConstant.SIGN_DRAW_GOLD_CONSUME_GROUP)
public class SignDrawGoldMessageMQListener implements RocketMQListener<SignDrawGoldMessage> {
@Autowired
private JedisService jedisService;
@Autowired
private SignDrawGoldService signDrawGoldService;
@SneakyThrows
@JmsListener(destination = "sign-draw-gold-queue", containerFactory = "signDrawGoldContainer")
public void onMessage(SignDrawGoldMessage signDrawGoldMessage) {
log.info("onMessage signDrawGoldMessage: {}", JSON.toJSONString(signDrawGoldMessage));
// 判断该消息是否已经消费过
String messStatus = jedisService.hget(RedisKey.mq_sign_draw_gold_status.getKey(), signDrawGoldMessage.getMessId());
if (BlankUtil.isBlank(messStatus)) {
return;
}
signDrawGoldService.handleDrawGoldMessage(signDrawGoldMessage);
}
}

View File

@@ -0,0 +1,42 @@
package com.accompany.business.service.mq.listener;
import com.accompany.business.message.SignMessage;
import com.accompany.business.service.mq.RocketMQConstant;
import com.accompany.business.service.signweb.SignService;
import com.accompany.common.redis.RedisKey;
import com.accompany.common.utils.BlankUtil;
import com.accompany.core.service.common.JedisService;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
/**
* 每日签到奖励发放
*/
@Component
@Slf4j
@Service
@RocketMQMessageListener(topic = RocketMQConstant.SIGN_TOPIC, consumerGroup = RocketMQConstant.SIGN_CONSUME_GROUP)
public class SignMessageMQListener implements RocketMQListener<SignMessage> {
@Autowired
private JedisService jedisService;
@Autowired
private SignService signService;
public void onMessage(SignMessage signMessage) {
log.info("onMessage signMessage: {}", JSON.toJSONString(signMessage));
// 判断该消息是否已经消费过
String messStatus = jedisService.hget(RedisKey.mq_sign_status.getKey(), signMessage.getMessId());
if (BlankUtil.isBlank(messStatus)) {
return;
}
signService.handlerSign(signMessage);
}
}

View File

@@ -0,0 +1,67 @@
package com.accompany.business.service.mq.listener;
import com.accompany.business.service.gamemange.GameManageAccessTicketBizService;
import com.accompany.business.service.mq.RocketMQConstant;
import com.accompany.common.redis.RedisKey;
import com.accompany.common.utils.BlankUtil;
import com.accompany.core.service.common.JedisService;
import com.accompany.core.vo.UserFirstLoginMsgVO;
import com.alibaba.fastjson.JSON;
import com.xuanyin.gamematch.constants.GameManageAccessTicketEnum;
import com.xuanyin.gamematch.handler.ticketaccessstatey.ITicketAccessStategy;
import com.xuanyin.gamematch.handler.ticketaccessstatey.TicketAccessStategyFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RocketMQMessageListener(topic = RocketMQConstant.USER_FIRST_LOGIN_TOPIC, consumerGroup = RocketMQConstant.USER_FIRST_LOGIN_CONSUME_GROUP)
public class UserFirstLoginMessageMQListener implements RocketMQListener<UserFirstLoginMsgVO> {
@Autowired
private JedisService jedisService;
@Autowired
private TicketAccessStategyFactory factory;
@Autowired
private GameManageAccessTicketBizService gameManageAccessTicketBizService;
public void onMessage(UserFirstLoginMsgVO msg) {
log.info("用户首登消息队列, msg: {} ", JSON.toJSONString(msg));
// 判断该消息是否已经消费过
String messStatus = jedisService.hget(RedisKey.mq_user_first_login_status.getKey(), msg.getUid().toString());
if (BlankUtil.isBlank(messStatus)) {
return;
}
handleDayFirstLogin(msg);
handleWeekFirstLogin(msg);
jedisService.hdel(RedisKey.mq_user_first_login_status.getKey(), msg.getUid().toString());
}
@Async
void handleDayFirstLogin(UserFirstLoginMsgVO msg) {
if (msg.getIsDayFirstLogin()) {
log.info("处理用户每日首登 uid {}", msg.getUid());
ITicketAccessStategy stategy = factory.getInstance(GameManageAccessTicketEnum.DAY_LOGIN.getValue());
stategy.doSend(msg.getUid());
gameManageAccessTicketBizService.setTicketAccessInfoToCache(stategy.getAccessType().getValue(), msg.getUid());
}
}
@Async
void handleWeekFirstLogin(UserFirstLoginMsgVO msg) {
if (msg.getIsWeekFirstLogin()) {
log.info("处理用户每周首登 uid {}", msg.getUid());
ITicketAccessStategy stategy = factory.getInstance(GameManageAccessTicketEnum.WEEK_LOGIN.getValue());
stategy.doSend(msg.getUid());
gameManageAccessTicketBizService.setTicketAccessInfoToCache(stategy.getAccessType().getValue(), msg.getUid());
}
}
}

View File

@@ -0,0 +1,43 @@
package com.accompany.business.service.mq.listener;
import com.accompany.business.message.VoiceLikeMessage;
import com.accompany.business.service.mq.RocketMQConstant;
import com.accompany.business.service.voice.VoiceService;
import com.accompany.common.redis.RedisKey;
import com.accompany.core.service.common.JedisService;
import com.accompany.core.util.StringUtils;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @author chucheng
* @date 2019-06-05
* @description 声音瓶子 喜欢/不喜欢消息监听
*/
@Slf4j
@Service
@RocketMQMessageListener(topic = RocketMQConstant.VOICE_LIKE_TOPIC, consumerGroup = RocketMQConstant.VOICE_LIKE_CONSUME_GROUP)
public class VoiceLikeMessageMQListener implements RocketMQListener<VoiceLikeMessage> {
@Autowired
private VoiceService voiceService;
@Autowired
private JedisService jedisService;
public void onMessage(VoiceLikeMessage message) {
log.info("onMessage VoiceLikeMessage: {}", JSON.toJSONString(message));
// 判断该消息是否已经消费过
String messStatus = jedisService.hget(RedisKey.voice_like_status.getKey(), message.getMessId());
if (StringUtils.isBlank(messStatus)) {
log.error("handleVoiceLikeMessage status error.message = {}", JSON.toJSONString(message));
return;
}
voiceService.handleVoiceLikeMessage(message);
}
}

View File

@@ -0,0 +1,40 @@
package com.accompany.business.service.mq.listener;
import com.accompany.business.message.YidunIMAntiMessage;
import com.accompany.business.service.mq.RocketMQConstant;
import com.accompany.business.service.netease.YidunAntiHandleService;
import com.accompany.common.redis.RedisKey;
import com.accompany.core.service.common.JedisService;
import com.accompany.core.util.StringUtils;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* 易盾文本反垃圾消息处理
*/
@Slf4j
@Service
@RocketMQMessageListener(topic = RocketMQConstant.YIDUN_TEXT_ANTI_TOPIC, consumerGroup = RocketMQConstant.YIDUN_TEXT_ANTI_CONSUME_GROUP)
public class YidunIMTextAntiMessageMQListener implements RocketMQListener<YidunIMAntiMessage> {
@Autowired
private YidunAntiHandleService yidunAntiHandleService;
@Autowired
private JedisService jedisService;
public void onMessage(YidunIMAntiMessage message) {
log.info("handle yidun im text anti message {}", JSON.toJSONString(message));
String messStatus = jedisService.hget(RedisKey.voice_like_status.getKey(), message.getMessId());
if (StringUtils.isBlank(messStatus)) {
log.error("handleYidunIMAntiMessage status error.message = {}", JSON.toJSONString(message));
return;
}
yidunAntiHandleService.handlIMTextAnti(message.getChatMsg());
// 删除消息标识
jedisService.hdel(RedisKey.mq_yindun_im_text_anti_status.getKey(), message.getMessId());
}
}

View File

@@ -13,6 +13,7 @@ import com.accompany.business.message.YidunIMAntiMessage;
import com.accompany.business.param.LeftChatRoomParam;
import com.accompany.business.param.MicQueueParam;
import com.accompany.business.service.mq.ActiveMQService;
import com.accompany.business.service.mq.RocketMQService;
import com.accompany.business.service.room.*;
import com.accompany.business.service.user.UserInRoomService;
import com.accompany.business.service.useronline.UserOnlineService;
@@ -71,6 +72,8 @@ public class ReceiveNeteaseService extends BaseService {
@Autowired
private ActiveMQService activeMQService;
@Autowired
private RocketMQService rocketMQService;
@Autowired
private UserInOutRoomRecordService userInOutRoomRecordService;
public static final int TEMP_EXPIRE_SECOND = 15;
@@ -272,9 +275,10 @@ public class ReceiveNeteaseService extends BaseService {
&& isYindunNotPass(suggestion)
) {
YidunIMAntiMessage msg = buildYidunAntiMqMsg(chatMsg);
activeMQService.sendYidunIMTextAntiMsg(msg);
//activeMQService.sendYidunIMTextAntiMsg(msg);
// 缓存消息的消费状态,便于队列消息做幂等处理
jedisService.hwrite(RedisKey.mq_yindun_im_text_anti_status.getKey(), msg.getMessId(), gson.toJson(msg));
rocketMQService.sendYidunIMTextAntiMsg(msg);
}
}

View File

@@ -10,6 +10,7 @@ import com.accompany.business.service.ErBanNetEaseService;
import com.accompany.business.service.SendSysMsgService;
import com.accompany.business.service.box.BoxPrizeMessageService;
import com.accompany.business.service.mq.ActiveMQService;
import com.accompany.business.service.mq.RocketMQService;
import com.accompany.business.service.record.BillRecordService;
import com.accompany.business.service.purse.UserPurseService;
import com.accompany.business.service.user.UsersService;
@@ -46,44 +47,39 @@ import java.util.stream.Collectors;
*/
@Service
public class SignDrawGoldService extends BaseService {
@Autowired
GoldPrizeMapper goldPrizeMapper;
@Autowired
PrizeGoldPoolMapper prizeGoldPoolMapper;
@Autowired
DrawGoldRecordMapper drawGoldRecordMapper;
@Autowired
private SysConfService sysConfService;
@Autowired
private UserPurseService userPurseService;
@Autowired
UserSignRoundService userSignRoundService;
private UserSignRoundService userSignRoundService;
@Autowired
UserDrawStatisService userDrawStatisService;
private UserDrawStatisService userDrawStatisService;
@Autowired
PrizeGoldPoolService prizeGoldPoolService;
private PrizeGoldPoolService prizeGoldPoolService;
@Autowired
PrizeGoldWhiteService prizeGoldWhiteService;
private PrizeGoldWhiteService prizeGoldWhiteService;
@Autowired
GoldPrizeService goldPrizeService;
private GoldPrizeService goldPrizeService;
@Autowired
DrawGoldService drawGoldService;
private DrawGoldService drawGoldService;
@Autowired
UsersService usersService;
private UsersService usersService;
@Autowired
BoxPrizeMessageService boxPrizeMessageService;
@Autowired
ErBanNetEaseService erBanNetEaseService;
private BoxPrizeMessageService boxPrizeMessageService;
@Autowired
private ActiveMQService activeMQService;
@Autowired
private RocketMQService rocketMQService;
@Autowired
private UsersService userService;
@Autowired
private SignGoldService signGoldService;
@Autowired
private SendSysMsgService sendSysMsgService;
@Autowired
BillRecordService billRecordService;
private BillRecordService billRecordService;
@Async
public BusiResult draw(Long uid,BusiResult result){
@@ -334,9 +330,10 @@ public class SignDrawGoldService extends BaseService {
}
SignDrawGoldMessage message = buildDrawGoldMessage(uid,drawGoldRecord);
// 发送消息
activeMQService.sendSignDrawGoldMessage(message);
//activeMQService.sendSignDrawGoldMessage(message);
// 缓存消息的消费状态,便于队列消息做幂等处理
jedisService.hwrite(RedisKey.mq_sign_draw_gold_status.getKey(), message.getMessId(),gson.toJson(message));
rocketMQService.sendSignDrawGoldMessage(message);
}
/**

View File

@@ -1,5 +1,6 @@
package com.accompany.business.service.signweb;
import com.accompany.business.service.mq.RocketMQService;
import com.alibaba.fastjson.JSON;
import com.accompany.business.constant.PrizeTypeEnum;
import com.accompany.business.constant.RadishBillTypeEnum;
@@ -32,26 +33,16 @@ public class SignService extends BaseService {
@Autowired
private BoxPrizeStrategyFactory factory;
@Autowired
GoldPrizeMapper goldPrizeMapper;
private SignRewardConfigService signRewardConfigService;
@Autowired
PrizeGoldPoolMapper prizeGoldPoolMapper;
private UserSignRoundService userSignRoundService;
@Autowired
PrizeGoldPoolService prizeGoldPoolService;
@Autowired
GoldPrizeService goldPrizeService;
@Autowired
SignRewardConfigService signRewardConfigService;
@Autowired
UserSignRoundService userSignRoundService;
@Autowired
SignRecordService signRecordService;
@Autowired
SignPrizeRecordService signPrizeRecordService;
@Autowired
private NobleRightService nobleRightService;
private SignPrizeRecordService signPrizeRecordService;
@Autowired
private ActiveMQService activeMQService;
@Autowired
private RocketMQService rocketMQService;
@Autowired
private UserDrawStatisService userDrawStatisService;
/**
@@ -63,9 +54,10 @@ public class SignService extends BaseService {
SignMessage message = buildSignMessage(uid,signType);
message.setSignRecordId(signRecordId);
// 发送消息
activeMQService.sendSignMessage(message);
//activeMQService.sendSignMessage(message);
// 缓存消息的消费状态,便于队列消息做幂等处理
jedisService.hwrite(RedisKey.mq_sign_status.getKey(), message.getMessId(),gson.toJson(message));
rocketMQService.sendSignMessage(message);
}

View File

@@ -10,6 +10,7 @@ import com.accompany.business.mybatismapper.*;
import com.accompany.business.param.neteasepush.NeteasePushParam;
import com.accompany.business.service.SendSysMsgService;
import com.accompany.business.service.mq.ActiveMQService;
import com.accompany.business.service.mq.RocketMQService;
import com.accompany.business.service.user.UsersService;
import com.accompany.business.util.ReplaceDomainUtil;
import com.accompany.business.vo.PiaDramaVo;
@@ -36,7 +37,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.jsonwebtoken.lang.Assert;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
@@ -46,8 +46,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
;
/**
* @author chucheng
* @date 2019-05-30
@@ -64,6 +62,8 @@ public class VoiceService extends BaseService {
@Autowired
private ActiveMQService activeMQService;
@Autowired
private RocketMQService rocketMQService;
@Autowired
private UserVoiceMapper userVoiceMapper;
@@ -150,7 +150,8 @@ public class VoiceService extends BaseService {
// 缓存消息的消费状态
jedisService.hwrite(RedisKey.voice_like_status.getKey(), message.getMessId(), gson.toJson(message));
jedisService.hwrite(RedisKey.temp_voice_like_record.getKey(uid.toString()), message.getMessId(), voiceId+"");
activeMQService.sendVoiceLikeMessage(message);
//activeMQService.sendVoiceLikeMessage(message);
rocketMQService.sendVoiceLikeMessage(message);
}
if(VoiceLikeEnum.LIKE.getValue()==type){
// 私聊发送消息
@@ -496,12 +497,6 @@ public class VoiceService extends BaseService {
*/
@Transactional(rollbackFor = Exception.class)
public void handleVoiceLikeMessage(VoiceLikeMessage message){
// 判断该消息是否已经消费过
String messStatus = jedisService.hget(RedisKey.voice_like_status.getKey(), message.getMessId());
if (StringUtils.isBlank(messStatus)) {
logger.error("handleVoiceLikeMessage status error.message = {}", JSON.toJSONString(message));
return;
}
// 防止消息被重复消费
boolean lock = this.jedisService.setnx(RedisKey.voice_like_lock.getKey(message.getMessId()), "1",60);
if (!lock) {

View File

@@ -62,8 +62,6 @@ public class JmsConfig {
// 用户首登消息队列
public final static String USER_FIRST_LOGIN_MSG_QUEUE = "user_first_login_msg_queue";
public final static String AD_PLATFORM_USER_LOGIN_MSG_QUEUE = "ad_platform_user_login_msg_queue";
public final static String LINEARLY_POOL_DEFAULT_QUEUE = "linearly-pool-draw-queue";
@@ -105,9 +103,6 @@ public class JmsConfig {
@Bean
public Queue userFirstLoginMsgQueue() { return new ActiveMQQueue(USER_FIRST_LOGIN_MSG_QUEUE); }
@Bean
public Queue adPlatfromUserLoginMsgQueue() { return new ActiveMQQueue(AD_PLATFORM_USER_LOGIN_MSG_QUEUE); }
@Bean
public Queue linearlyPoolDefaultQueue() {
return new ActiveMQQueue(LINEARLY_POOL_DEFAULT_QUEUE);
@@ -354,13 +349,6 @@ public class JmsConfig {
return factory;
}
@Bean("adPlatformLoginMsgJmsTemplate")
public JmsTemplate adPlatformLoginMsgJmsTemplate(@Qualifier("myConnectionFactory")ConnectionFactory activeMQConnectionFactory) {
JmsTemplate template = new JmsTemplate(activeMQConnectionFactory);
template.setDefaultDestination(adPlatfromUserLoginMsgQueue());
return template;
}
@Bean("adPlatformUserLoginMsgContainer")
public DefaultJmsListenerContainerFactory adPlatformUserLoginMsgContainer() {

View File

@@ -1,5 +1,8 @@
package com.accompany.business.mq;
import com.accompany.common.redis.RedisKey;
import com.accompany.core.service.common.JedisService;
import com.accompany.core.util.StringUtils;
import com.alibaba.fastjson.JSON;
import com.accompany.business.message.VoiceLikeMessage;
import com.accompany.business.service.voice.VoiceService;
@@ -20,11 +23,19 @@ public class VoiceLikeMessageListener {
@Autowired
private VoiceService voiceService;
@Autowired
private JedisService jedisService;
@JmsListener(destination = "voice-like-queue",containerFactory = "jmsContainer3")
public void onMessage(VoiceLikeMessage message) {
try {
logger.info("onMessage VoiceLikeMessage: {}", JSON.toJSONString(message));
// 判断该消息是否已经消费过
String messStatus = jedisService.hget(RedisKey.voice_like_status.getKey(), message.getMessId());
if (StringUtils.isBlank(messStatus)) {
logger.error("handleVoiceLikeMessage status error.message = {}", JSON.toJSONString(message));
return;
}
voiceService.handleVoiceLikeMessage(message);
} catch (Exception e) {
logger.error("onMessage VoiceLikeMessage error, message={}", JSON.toJSONString(message), e);

View File

@@ -48,6 +48,13 @@ activemq:
maxConnections: 50
idleTimeout: 30000
## rocketmq 配置
rocketmq:
name-server: 128.1.134.148:9876
producer:
group: peko-group
sendMessageTimeout: 300000
## ES配置
elasticsearch:
clusterName: elasticsearch
@@ -58,12 +65,6 @@ elasticsearch:
roomIndex: yinyou_dev_room
userIndex: yinyou_dev_users_202204121516
## rocketmq 配置
rocketmq:
name-server: 128.1.134.148:9876
producer:
group: peko-group
sendMessageTimeout: 300000
server:
port: 8081

View File

@@ -98,12 +98,6 @@ public class ChannelContentServiceTest extends CommonTest {
@Autowired
@Qualifier( value = "adPlatformLoginMsgJmsTemplate")
private JmsTemplate adPlatformLoginMsgJmsTemplate;
@Test
public void test() {
UserFirstLoginMsgVO msg = new UserFirstLoginMsgVO();
msg.setUid(1003848L);
adPlatformLoginMsgJmsTemplate.convertAndSend("ad_platform_user_login_msg_queue", msg);
}
@Test
public void testAccountChannelByUid() {

View File

@@ -0,0 +1,8 @@
package com.accompany.oauth2.mq;
public class RocketMQConstant {
public final static String USER_FIRST_LOGIN_TOPIC = "user_first_login";
public final static String USER_FIRST_LOGIN_CONSUME_GROUP = "user_first_login_consume_group";
}

View File

@@ -0,0 +1,24 @@
package com.accompany.oauth2.mq;
import com.accompany.core.vo.UserFirstLoginMsgVO;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* Created by 恒仔 on 2023/3/12.
*/
@Slf4j
@Service
public class RocketMQService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendUserFirstLoginMessage(UserFirstLoginMsgVO message) {
log.info("sendUserFirstLoginMessage message: {}", JSON.toJSONString(message));
rocketMQTemplate.convertAndSend(RocketMQConstant.USER_FIRST_LOGIN_TOPIC, message);
}
}

View File

@@ -7,6 +7,7 @@ import com.accompany.common.utils.GetTimeUtils;
import com.accompany.core.service.account.LoginRecordService;
import com.accompany.core.service.common.JedisService;
import com.accompany.core.vo.UserFirstLoginMsgVO;
import com.accompany.oauth2.mq.RocketMQService;
import com.accompany.oauth2.service.account.AccountManageService;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Maps;
@@ -45,11 +46,7 @@ public class TicketServices implements InitializingBean{
@Autowired
private LoginRecordService loginRecordService;
@Autowired
@Qualifier( value = "userFirstLoginMsgJmsTemplate")
private JmsTemplate userFirstLoginMsgJmsTemplate;
@Autowired
@Qualifier( value = "adPlatformLoginMsgJmsTemplate")
private JmsTemplate adPlatformLoginMsgJmsTemplate;
private RocketMQService rocketMQService;
@Override
public void afterPropertiesSet() {
@@ -134,14 +131,14 @@ public class TicketServices implements InitializingBean{
msg.setChannel(deviceInfo.getChannel());
msg.setDeviceId(deviceInfo.getDeviceId());
logger.info("用户登录uid[{}],channel[{}], deviceId {}", uid, deviceInfo.getChannel(), deviceInfo.getDeviceId());
adPlatformLoginMsgJmsTemplate.convertAndSend("ad_platform_user_login_msg_queue", msg);
boolean dayFirstLogin = isDayFirstLogin(uid);
boolean weekFirstLogin = isWeekFirstLogin(uid);
if (dayFirstLogin || weekFirstLogin) {
msg.setIsDayFirstLogin(dayFirstLogin);
msg.setIsWeekFirstLogin(weekFirstLogin);
logger.info("发送送赠门票队列, msg {}", JSON.toJSONString(msg));
userFirstLoginMsgJmsTemplate.convertAndSend("user_first_login_msg_queue", msg);
jedisService.hset(RedisKey.mq_user_first_login_status.getKey(), uid.toString(), JSON.toJSONString(msg));
rocketMQService.sendUserFirstLoginMessage(msg);
}
}

View File

@@ -45,7 +45,6 @@ public class JmsConfig {
public final static String MY_QUEUE = "spring-queue";
// 用户首登消息队列
public final static String USER_FIRST_LOGIN_MSG_QUEUE = "user_first_login_msg_queue";
public final static String AD_PLATFORM_USER_LOGIN_MSG_QUEUE = "ad_platform_user_login_msg_queue";
/**
* 定义点对点队列
@@ -98,9 +97,6 @@ public class JmsConfig {
@Bean
public Queue userFirstLoginMsgQueue() { return new ActiveMQQueue(USER_FIRST_LOGIN_MSG_QUEUE); }
@Bean
public Queue adPlatfromUserLoginMsgQueue() { return new ActiveMQQueue(AD_PLATFORM_USER_LOGIN_MSG_QUEUE); }
@Bean("userFirstLoginMsgJmsTemplate")
public JmsTemplate userFirstLoginMsgJmsTemplate(@Qualifier("myConnectionFactory")ConnectionFactory activeMQConnectionFactory) {
JmsTemplate template = new JmsTemplate(activeMQConnectionFactory);
@@ -120,13 +116,6 @@ public class JmsConfig {
return factory;
}
@Bean("adPlatformLoginMsgJmsTemplate")
public JmsTemplate adPlatformLoginMsgJmsTemplate(@Qualifier("myConnectionFactory")ConnectionFactory activeMQConnectionFactory) {
JmsTemplate template = new JmsTemplate(activeMQConnectionFactory);
template.setDefaultDestination(adPlatfromUserLoginMsgQueue());
return template;
}
@Bean("adPlatformUserLoginMsgContainer")
public DefaultJmsListenerContainerFactory adPlatformUserLoginMsgContainer() {

View File

@@ -48,6 +48,13 @@ activemq:
maxConnections: 50
idleTimeout: 30000
## rocketmq 配置
rocketmq:
name-server: 128.1.134.148:9876
producer:
group: peko-group
sendMessageTimeout: 300000
## ES配置
elasticsearch:
clusterName: elasticsearch

View File

@@ -290,18 +290,6 @@ public class JmsConfig {
return factory;
}
public final static String AD_PLATFORM_USER_LOGIN_MSG_QUEUE = "ad_platform_user_login_msg_queue";
@Bean
public Queue adPlatfromUserLoginMsgQueue() { return new ActiveMQQueue(AD_PLATFORM_USER_LOGIN_MSG_QUEUE); }
@Bean("adPlatformLoginMsgJmsTemplate")
public JmsTemplate adPlatformLoginMsgJmsTemplate(@Qualifier("myConnectionFactory")ConnectionFactory activeMQConnectionFactory) {
JmsTemplate template = new JmsTemplate(activeMQConnectionFactory);
template.setDefaultDestination(adPlatfromUserLoginMsgQueue());
return template;
}
@Bean("adPlatformUserLoginMsgContainer")
public DefaultJmsListenerContainerFactory adPlatformUserLoginMsgContainer() {

View File

@@ -48,6 +48,13 @@ activemq:
maxConnections: 50
idleTimeout: 30000
## rocketmq 配置
rocketmq:
name-server: 128.1.134.148:9876
producer:
group: peko-group
sendMessageTimeout: 300000
## ES配置
elasticsearch:
clusterName: elasticsearch