mq延时任务,添加移除房间管理员缓存,

This commit is contained in:
2025-05-09 18:48:33 +08:00
parent 78364f3000
commit edf8f51666
21 changed files with 308 additions and 38 deletions

View File

@@ -30,6 +30,9 @@ import com.accompany.business.service.user.UsersService;
import com.accompany.business.vo.RoomVo;
import com.accompany.common.constant.EmailConstant;
import com.accompany.common.device.DeviceInfo;
import com.accompany.common.netease.ErBanNetEaseService;
import com.accompany.common.netease.neteaseacc.result.RoomMemberRet;
import com.accompany.common.netease.util.NetEaseConstant;
import com.accompany.common.redis.RedisKey;
import com.accompany.common.status.BusiStatus;
import com.accompany.common.utils.CommonUtil;
@@ -61,6 +64,7 @@ import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.google.common.collect.Lists;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RedissonClient;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -79,6 +83,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
@Slf4j
@Service
public class MyApiService {
@@ -144,6 +149,10 @@ public class MyApiService {
@Autowired
private WeekGuildStatMapper weekGuildStatMapper;
@Autowired
private ErBanNetEaseService erBanNetEaseService;
@Autowired
private RedissonClient redissonClient;
@Autowired
private AgencyMonthSettleService agencyMonthSettleService;
@Autowired
private GuildUsdOperateService guildUsdOperateService;
@@ -736,4 +745,17 @@ public class MyApiService {
emailService.sendEmailCode(emailAddress, type.intValue(), new DeviceInfo(), "127.0.0.1", null, false);
}
public List<RoomMemberRet> reloadAdminMember(Boolean refreshCache, Long roomErbanNo) {
Long roomUid = null;
if (roomErbanNo != null) {
Users user = usersService.getUserByErbanNo(roomErbanNo);
RoomVo roomVoByUid = roomService.getRoomVoByUid(user.getUid());
if (roomVoByUid == null) {
return null;
}
roomUid = roomVoByUid.getUid();
}
return roomService.reloadAdminMember(refreshCache, roomUid);
}
}

View File

@@ -4,8 +4,9 @@ import cn.hutool.core.date.DateUtil;
import com.accompany.admin.service.userevent.vo.UserEventAdminVO;
import com.accompany.business.model.userevent.UserEvent;
import com.accompany.business.model.userevent.constant.UserEventConstant;
import com.accompany.business.model.userevent.vo.UserEventBeginMessage;
import com.accompany.business.message.UserEventBeginEndMessage;
import com.accompany.business.service.follow.FansService;
import com.accompany.business.service.mq.RocketMQService;
import com.accompany.business.service.user.UsersService;
import com.accompany.business.service.userevent.UserEventService;
import com.accompany.common.result.BusiResult;
@@ -32,13 +33,11 @@ public class UserEventAdminService {
@Autowired
private UserEventService userEventService;
@Autowired
private ActiveMQService activeMQService;
private RocketMQService rocketMQService;
@Autowired
private FansService fansService;
@Autowired
private RedissonClient redissonClient;
@Autowired
private DailyTaskInService dailyTaskInService;
public BusiResult<PageResult<UserEventAdminVO>> listEvent(Long erbanNo, Long roomErbanNo, Integer pageNo, Integer pageSize, Byte region) {
Long uid = null, roomUid = null;
@@ -127,13 +126,13 @@ public class UserEventAdminService {
List<Long> followUids = fansService.getFansUidListByUid(userEvent.getUid());
if (userEvent.getEventStartTime().after(now)) {//待发布 发活动开始通知
jsonObject.put("type", 1);
UserEventBeginMessage baseMessage = UserEventBeginMessage.builder().messId(UUIDUtil.get()).eventId(userEvent.getId()).endEvent(false).build();
activeMQService.userEventDelayMessage(baseMessage, userEvent.getEventStartTime().getTime() - System.currentTimeMillis());
UserEventBeginEndMessage baseMessage = UserEventBeginEndMessage.builder().messId(UUIDUtil.get()).eventId(userEvent.getId()).endEvent(false).build();
rocketMQService.sendUserEventDelayedMessage(baseMessage, (int)(userEvent.getEventStartTime().getTime() - System.currentTimeMillis()));
userEventService.notifyPush(followUids, jsonObject, userEvent);
} else if (userEvent.getEventStartTime().before(now) && userEvent.getEventEndTime().after(now)) { //已经发布 发活动发布通知
jsonObject.put("type", 2);
UserEventBeginMessage baseMessage = UserEventBeginMessage.builder().messId(UUIDUtil.get()).eventId(userEvent.getId()).endEvent(Boolean.TRUE).build();
activeMQService.userEventDelayMessage(baseMessage, userEvent.getEventEndTime().getTime() - System.currentTimeMillis());
UserEventBeginEndMessage baseMessage = UserEventBeginEndMessage.builder().messId(UUIDUtil.get()).eventId(userEvent.getId()).endEvent(Boolean.TRUE).build();
rocketMQService.sendUserEventDelayedMessage(baseMessage, (int)(userEvent.getEventEndTime().getTime() - System.currentTimeMillis()));
userEventService.notifyPush(followUids, jsonObject, userEvent);
}
}

View File

@@ -22,6 +22,11 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<dependency>
<groupId>com.accompany</groupId>
<artifactId>accompany-mq-web</artifactId>
<version>${revision}</version>
</dependency>
</dependencies>
<build>

View File

@@ -13,8 +13,8 @@ import org.springframework.context.annotation.Configuration;
* @description:
*/
@Slf4j
@Configuration
@ConditionalOnClass(value = RocketMQTemplate.class)
//@Configuration
//@ConditionalOnClass(value = RocketMQTemplate.class)
public class RocketMQConfiguration {
@Bean

View File

@@ -1,8 +1,14 @@
package com.accompany.admin.controller.api;
import cn.hutool.core.date.DateUtil;
import com.accompany.admin.service.api.MyApiService;
import com.accompany.business.message.UserEventBeginEndMessage;
import com.accompany.business.service.activity.h5.ActivityUserLevelExpService;
import com.accompany.business.service.mq.RocketMQService;
import com.accompany.business.service.room.RoomService;
import com.accompany.business.vo.RoomVo;
import com.accompany.common.netease.ErBanNetEaseService;
import com.accompany.common.netease.neteaseacc.result.RoomMemberRet;
import com.accompany.common.result.BusiResult;
import com.accompany.common.status.BusiStatus;
import com.accompany.common.utils.DateTimeUtil;
@@ -10,12 +16,10 @@ import com.accompany.core.exception.AdminServiceException;
import com.accompany.core.exception.ServiceException;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;
import java.util.Date;
import java.util.List;
@RestController
@RequestMapping("/admin/api")
@@ -24,6 +28,9 @@ public class MyApiController {
@Autowired
private MyApiService myApiService;
@Autowired
private RocketMQService rocketMQService;
@RequestMapping("/refreshRoomCharmRankList")
@ResponseBody
public BusiResult<Void> refreshRoomCharmRankList(Long roomId) {
@@ -272,4 +279,29 @@ public class MyApiController {
myApiService.testMail();
return BusiResult.success();
}
@PostMapping("/delayUserEventMq")
public BusiResult<Void> testMail(@RequestBody UserEventBeginEndMessage message, int delayMillis) {
message.setMsgDate(DateUtil.formatDateTime(new Date()));
rocketMQService.sendUserEventDelayedMessage(message, delayMillis);
return BusiResult.success();
}
@Autowired
private RoomService roomService;
/**
* 重刷管理员列表,获取房间管理员列表
*
* @return
*/
@GetMapping(value = "/reload/admin")
public BusiResult<List<RoomMemberRet>> reloadAdmin(@RequestParam(defaultValue = "false") Boolean refreshCache, Long roomErbanNo, String passWord) throws Exception {
if (!"12345".equals(passWord)){
return BusiResult.fail("error");
}
List<RoomMemberRet> roomMemberRets = myApiService.reloadAdminMember(refreshCache, roomErbanNo);
return BusiResult.success(roomMemberRets);
}
}

View File

@@ -10,6 +10,9 @@ public interface RoomConstant {
room_mic_dress,//房间麦位特效
room_used_mic_dress,//房间正在使用的麦位特效
user_manage_room,//房间管理列表
room_manage_user,//用户管理房间列表
;
@@ -23,4 +26,9 @@ public interface RoomConstant {
public static final int SKIN = 1;
public static final int EFFECT = 2;
}
interface RoomManageOperate {
int ADD = 1;
int REMOVE = 2;
}
}

View File

@@ -1,5 +1,6 @@
package com.accompany.business.model.userevent.vo;
package com.accompany.business.message;
import com.accompany.mq.model.BaseMqMessage;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@@ -11,7 +12,8 @@ import java.io.Serializable;
@NoArgsConstructor
@Builder
@Data
public class UserEventBeginMessage implements Serializable {
public class UserEventBeginEndMessage extends BaseMqMessage implements Serializable{
private String msgDate;
private Boolean endEvent = false;
private Long eventId;
private String messId;

View File

@@ -1,7 +1,7 @@
package com.accompany.business.model.userevent.vo;
import com.accompany.business.model.userevent.UserEventSub;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.strawberry.business.model.userevent.UserEventSub;
import lombok.Data;
import lombok.EqualsAndHashCode;

View File

@@ -0,0 +1,17 @@
package com.accompany.business.vo.room;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
public class RoomManageDetailVO {
private Long roomUid;
private Long erbanNo;
private String avatar;
private String roomName;
}

View File

@@ -0,0 +1,12 @@
package com.accompany.business.vo.room;
import lombok.Data;
import java.util.List;
@Data
public class RoomManageVO {
private RoomManageDetailVO selfRoom;
private List<RoomManageDetailVO> manageRooms;
}

View File

@@ -147,5 +147,14 @@ public class RocketMQService {
throwable -> log.error("sendGameMsgPushMessage fail message: {}", JSON.toJSONString(gameMsgMessage), throwable));
}
/**
* 发送延时消息
* @param userEventBeginEndMessage 消息内容
* @param delayMillis 延时时长,时间戳
*/
public void sendUserEventDelayedMessage(UserEventBeginEndMessage userEventBeginEndMessage, int delayMillis) {
SendResult sendResult = mqMessageProducer.sendDelay(MqConstant.USER_ENEVT_DELAY_TOPIC, userEventBeginEndMessage, delayMillis);
log.info("sendUserEventDelayedMessage message: {} sendResult {}", JSON.toJSONString(userEventBeginEndMessage), JSON.toJSONString(sendResult));
}
}

View File

@@ -271,7 +271,7 @@ public class RoomManageService {
}
SpringContextHolder.getApplicationContext().publishEvent(new RoomPasswordChangeEvent(newRoom));
roomService.reloadAdminMember(true, roomUid);//临时加上处理管理员列表,兼容旧版本,后期可以移除android<=1.0.26, ios<=20.20.59
return convertRoomToVo(roomService.getRoomByUid(roomUid));
} catch (ServiceException e) {
log.error(e.getMessage(), e);

View File

@@ -89,8 +89,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RScoredSortedSet;
import org.redisson.api.RedissonClient;
import org.redisson.api.*;
import org.redisson.client.codec.LongCodec;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
@@ -108,8 +107,10 @@ import java.util.concurrent.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import static com.accompany.business.constant.RoomConstant.RedisKey.*;
/**
/**
* Created by liuguofu on 2017/5/20.
*/
@Service
@@ -555,6 +556,7 @@ public class RoomService extends BaseService {
//管理员或者创建者可以修改
if (stringBuffer.toString().equals(MANAGER)) {
updateRunningRoom(room);
this.reloadAdminMember(true, room.getUid());//临时加上处理管理员列表兼容旧版本android<=1.0.26, ios<=
} else {
log.error("updateRunningRoomByAdmin failed 3: {}", result);
busiResult.setCode(HttpStatus.URI_TOO_LONG.value());
@@ -2207,7 +2209,50 @@ public class RoomService extends BaseService {
}
public List<RoomMemberRet> reloadAdminMember(Boolean refreshCache, Long roomUid) {
List<RoomMemberRet> roomMemberRets = null;
try {
roomMemberRets = null;
Map<String, Object> roles = new HashMap<>();
roles.put("manager", true);
List<RoomVo> rooms = new ArrayList<>();
if (roomUid != null) {
RoomVo roomVoByUid = this.getRoomVoByUid(roomUid);
rooms.add(roomVoByUid);
} else {
rooms = this.getHomeRunningRoomList();
}
for (RoomVo room : rooms) {
roomMemberRets = null;
roomMemberRets = erBanNetEaseService.queryMembersByRole(room.getRoomId(), JSONObject.toJSONString(roles));
if (!refreshCache || CollectionUtils.isEmpty(roomMemberRets)){
continue;
}
roomUid = room.getUid();
this.roomManageUser(roomUid).delete();
for (RoomMemberRet roomMemberRet : roomMemberRets) {
Long accid = roomMemberRet.getAccid();
this.roomManageUser(roomUid).add(accid);
this.userManageRoom(accid).add(roomUid);
}
}
} catch (Exception e) {
log.info("MyApiService.reloadAdminMember, refreshCache:{}, roomUid:{},e:{}", refreshCache, roomUid, e.getMessage(), e);
}
return roomMemberRets;
}
public RScoredSortedSet<Long> getRoomUserZset(Long roomUid) {
return redissonClient.getScoredSortedSet(RedisKey.room_user_zset.getKey(roomUid.toString()), LongCodec.INSTANCE);
}
public RSet<Long> userManageRoom(Long uid) {
return redissonClient.getSet(user_manage_room.getKey(uid), LongCodec.INSTANCE);
}
public RSet<Long> roomManageUser(Long roomUid) {
return redissonClient.getSet(room_manage_user.getKey(roomUid), LongCodec.INSTANCE);
}
}

View File

@@ -1,7 +1,7 @@
package com.accompany.business.service.userevent;
import com.accompany.business.model.userevent.UserEventData;
import com.accompany.business.model.userevent.vo.UserEventBeginMessage;
import com.accompany.business.message.UserEventBeginEndMessage;
import com.accompany.business.model.userevent.vo.UserEventDataVO;
import com.accompany.common.result.BusiResult;
import com.accompany.common.result.PageResult;
@@ -19,7 +19,7 @@ public interface UserEventDataService extends IService<UserEventData> {
* 用户活动结束通知
* @param beginMessage
*/
void notifyEnd(UserEventBeginMessage beginMessage);
void notifyEnd(UserEventBeginEndMessage beginMessage);
BusiResult<PageResult<UserEventDataVO>> list(Integer pageNo, Integer pageSize, Byte region);
}

View File

@@ -1,7 +1,7 @@
package com.accompany.business.service.userevent;
import com.accompany.business.model.userevent.UserEvent;
import com.accompany.business.model.userevent.vo.UserEventBeginMessage;
import com.accompany.business.message.UserEventBeginEndMessage;
import com.accompany.business.model.userevent.vo.UserEventVO;
import com.accompany.common.result.BusiResult;
import com.alibaba.fastjson.JSONObject;
@@ -96,7 +96,7 @@ public interface UserEventService extends IService<UserEvent> {
* 用户活动开始通知
* @param beginMessage
*/
void notifyBegin(UserEventBeginMessage beginMessage);
void notifyBegin(UserEventBeginEndMessage beginMessage);
void notifyPush(List<Long> uids, JSONObject jsonObject, UserEvent userEvent);

View File

@@ -3,7 +3,7 @@ package com.accompany.business.service.userevent.impl;
import com.accompany.business.model.userevent.UserEvent;
import com.accompany.business.model.userevent.UserEventData;
import com.accompany.business.model.userevent.constant.UserEventConstant;
import com.accompany.business.model.userevent.vo.UserEventBeginMessage;
import com.accompany.business.message.UserEventBeginEndMessage;
import com.accompany.business.model.userevent.vo.UserEventDataVO;
import com.accompany.business.mybatismapper.userevent.UserEventDataMapper;
import com.accompany.business.service.gift.GiftSendRecordService;
@@ -21,12 +21,10 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
import java.util.*;
/**
@@ -51,7 +49,7 @@ public class UserEventDataServiceImpl extends ServiceImpl<UserEventDataMapper, U
private GiftService giftService;
@Override
public void notifyEnd(UserEventBeginMessage beginMessage) {
public void notifyEnd(UserEventBeginEndMessage beginMessage) {
UserEvent userEvent = userEventService.getById(beginMessage.getEventId());
if (!(UserEventConstant.EventStatus.PASS == userEvent.getEventStatus())) {
log.info("用户活动未通过审核或者已经被删除UserEvent{}", JSONObject.toJSONString(userEvent));

View File

@@ -3,7 +3,7 @@ package com.accompany.business.service.userevent.impl;
import com.accompany.business.model.userevent.UserEvent;
import com.accompany.business.model.userevent.UserEventSub;
import com.accompany.business.model.userevent.constant.UserEventConstant;
import com.accompany.business.model.userevent.vo.UserEventBeginMessage;
import com.accompany.business.message.UserEventBeginEndMessage;
import com.accompany.business.model.userevent.vo.UserEventVO;
import com.accompany.business.mybatismapper.userevent.UserEventMapper;
import com.accompany.business.param.neteasepush.Body;
@@ -264,7 +264,7 @@ public class UserEventServiceImpl extends ServiceImpl<UserEventMapper, UserEvent
}
@Override
public void notifyBegin(UserEventBeginMessage beginMessage) {
public void notifyBegin(UserEventBeginEndMessage beginMessage) {
UserEvent userEvent = baseMapper.selectById(beginMessage.getEventId());
if (!(UserEventConstant.EventStatus.PASS == userEvent.getEventStatus())) {
log.info("用户活动未通过审核或者已经被删除UserEvent{}",JSONObject.toJSONString(userEvent));

View File

@@ -0,0 +1,76 @@
package com.accompany.business.controller.room;
import com.accompany.business.constant.RoomConstant;
import com.accompany.business.service.room.RoomService;
import com.accompany.business.service.user.UsersService;
import com.accompany.business.vo.room.RoomManageDetailVO;
import com.accompany.business.vo.room.RoomManageVO;
import com.accompany.common.result.BusiResult;
import com.accompany.core.model.Room;
import com.accompany.core.model.Users;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static com.accompany.common.constant.ApplicationConstant.PublicParameters.PUB_UID;
@RestController
@RequestMapping("/roomrole")
public class RoomRoleController {
@Autowired
private RoomService roomService;
@Autowired
private UsersService usersService;
@RequestMapping("/listroom")
public BusiResult<RoomManageVO> listRoom(@RequestHeader(value = PUB_UID) Long uid) {
RoomManageVO roomManageVO = new RoomManageVO();
Room room = roomService.getRoomByUid(uid);
if (room != null) {
Users users = usersService.getUsersByUid(uid);
roomManageVO.setSelfRoom(RoomManageDetailVO.builder()
.roomUid(room.getUid())
.avatar(room.getAvatar())
.erbanNo(users.getErbanNo())
.roomName(room.getTitle())
.build());
}
Set<Long> roomUids = roomService.userManageRoom(uid).readAll();
List<RoomManageDetailVO> list = new ArrayList<>();
for (Long s : roomUids) {
Long roomUid = Long.valueOf(s);
room = roomService.getRoomByUid(roomUid);
Users users = usersService.getUsersByUid(roomUid);
list.add(RoomManageDetailVO.builder()
.roomUid(room.getUid())
.avatar(room.getAvatar())
.erbanNo(users.getErbanNo())
.roomName(room.getTitle())
.build());
}
roomManageVO.setManageRooms(list);
return BusiResult.success(roomManageVO);
}
@RequestMapping("/manageOpt")
public BusiResult<Void> listRoom(Long roomUid, Long uid, @RequestParam(defaultValue = "0") Integer opt) {
if (RoomConstant.RoomManageOperate.ADD == opt) {
roomService.userManageRoom(uid).add(roomUid);
roomService.roomManageUser(roomUid).add(uid);
} else if (RoomConstant.RoomManageOperate.REMOVE == opt) {
roomService.userManageRoom(uid).remove(roomUid);
roomService.roomManageUser(roomUid).remove(uid);
}
return BusiResult.success();
}
}

View File

@@ -1,7 +1,6 @@
package com.accompany.business.controller.userevent;
import cn.hutool.core.date.DateUtil;
import com.accompany.business.model.userevent.vo.UserEventBeginMessage;
import com.accompany.business.message.UserEventBeginEndMessage;
import com.accompany.business.model.userevent.vo.UserEventVO;
import com.accompany.business.service.userevent.UserEventDataService;
import com.accompany.business.service.userevent.UserEventService;
@@ -11,13 +10,10 @@ import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import static com.accompany.common.constant.ApplicationConstant.PublicParameters.PUB_UID;
@@ -62,7 +58,7 @@ public class UserEventController {
// if (StringUtils.isNotEmpty(userEvent.getEventBanner())) {
// ImgUtilV2.staticPicCheck(userEvent.getEventBanner());
// }
userEventService.save(userEvent);
// userEventService.save(userEvent);
return BusiResult.success();
}
@@ -124,7 +120,7 @@ public class UserEventController {
@ApiOperation(value = "重跑数据", httpMethod = "GET")
@GetMapping("/reload")
public BusiResult<Void> list(Long eventId) {
userEventDataService.notifyEnd(UserEventBeginMessage.builder().endEvent(Boolean.TRUE).eventId(eventId).messId(UUIDUtil.get()).build());
userEventDataService.notifyEnd(UserEventBeginEndMessage.builder().endEvent(Boolean.TRUE).eventId(eventId).messId(UUIDUtil.get()).build());
return BusiResult.success();
}
}

View File

@@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
@@ -189,4 +190,29 @@ public class MQMessageProducer {
}
}
/**
* @param queueName
* @param object
* @param delayMillis 对应定时的延时时间毫秒
* @param <T>
*/
public <T extends BaseMqMessage> SendResult sendDelay(String queueName, T object, Integer delayMillis) {
if (object == null) {
return null;
}
if (delayMillis == null) {
delayMillis = 0;
}
try {
String objectJson = JSON.toJSONString(object);
log.info("queueName : {}, message : {}, delayMillis:{}", queueName, objectJson, delayMillis);
SendResult sendResult = rocketMQTemplate.syncSendDelayTimeMills(queueName, objectJson, delayMillis);
log.info("queueName : {}, message : {}, delayMillis:{},sendResult:{}", queueName, objectJson, delayMillis, sendResult);
return sendResult;
} catch (Exception e) {
log.error(e.getMessage(), e);
}
return null;
}
}

View File

@@ -0,0 +1,23 @@
package com.accompany.mq.consumer;
import cn.hutool.core.date.DateUtil;
import com.accompany.business.message.UserEventBeginEndMessage;
import com.accompany.mq.constant.MqConstant;
import com.accompany.mq.listener.AbstractMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import java.util.Date;
@Slf4j
@Component
@ConditionalOnProperty(name = "spring.application.name", havingValue = "admin")
@RocketMQMessageListener(topic = MqConstant.USER_ENEVT_DELAY_TOPIC, consumerGroup = MqConstant.USER_ENEVT_DELAY_CONSUME_GROUP)
public class UserEventBeginEndConsumer extends AbstractMessageListener<UserEventBeginEndMessage> {
@Override
protected void onMessage(UserEventBeginEndMessage eventBeginEndMessage) throws Exception {
log.info("onMessage gameMsgMessage: {}, begin:{},now:{}", eventBeginEndMessage.toString(), eventBeginEndMessage.getMsgDate(), DateUtil.formatDateTime(new Date()));
}
}