# Part03-集成RabbitMQ保证ES随文章增删改查-实时更新

blog
│  pom.xml
│
└─src
│  └─main
│      ├─java
│      │   └─org
│      │      └─myslayers
│      │          ├─config
│      │          │      RabbitConfig.java
│      │          │
│      │          ├─controller
│      │          │      BaseController.java
│      │          │      PostController.java
│      │          │
│      │          ├─service
│      │          │  │   SearchService.java
│      │          │  │
│      │          │  └─impl
│      │          │         SearchServiceImpl.java
│      │          │
│      │          └─search
│      │             └─amqp
│      │                    MqMessageHandler.java
│      │                    PostMqIndexMessage.java
│      │
│      └─resources
│          │  application.yml

# 3.1 集成 RabbitMQ 环境

  • pom.xml :项目依赖,【RabbitMQ 消息同步】
<dependencies>
  <!--rabbitmq:消息同步-->
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.1.2.RELEASE</version>
  </dependency>
</dependencies>
  • application.yml :配置文件,【RabbitMQ 消息同步】
spring:
  rabbitmq:
    username: guest
    password: guest
    host: 127.0.0.1
    port: 5672

# 3.2 配置 RabbitMQ 环境

  • RabbitConfig.java :配置类,【创建队列、交换机,并把它们通过 es_bind_key 进行绑定】
/**
 * RabbitConfig:配置类
 */
@Configuration
public class RabbitConfig {

    public final static String es_queue = "es_queue";
    public final static String es_exchage = "es_exchage";
    public final static String es_bind_key = "es_exchage";

    //队列
    @Bean
    public Queue exQueue() {
        return new Queue(es_queue);
    }

    //交换机
    @Bean
    DirectExchange exchange() {
        return new DirectExchange(es_exchage);
    }

    //绑定队列与交换机
    @Bean
    Binding binding(Queue exQueue, DirectExchange exchange) {
        return BindingBuilder.bind(exQueue).to(exchange).with(es_bind_key);
    }
}
  • PostMqIndexMessage.java :实体类,供 -> 【/post/submit、/post/delete】 -> 使用 convertAndSend 【 交换机,路由密钥,发送的消息(操作的文章、操作的类型) 】
/**
 * PostMqIndexMessage:实体类
 * 供 -> 【/post/submit、/post/delete】 -> 使用 convertAndSend 【 交换机,路由密钥,发送的消息(操作的文章、操作的类型) 】
 */
@Data
@AllArgsConstructor
public class PostMqIndexMessage implements Serializable {

    // 两种type:一种是create_update、一种是remove
    public final static String CREATE_OR_UPDATE = "create_update";
    public final static String REMOVE = "remove";

    // 操作的文章:postId
    private Long postId;

    // 操作的类型:增删改查
    private String type;

}
  • MqMessageHandler.java :执行类,【执行操作的逻辑】
/**
 * RabbitMQ:执行操作的逻辑
 */
@Slf4j
@Component
@RabbitListener(queues = RabbitConfig.es_queue) //监听的队列es_queue
public class MqMessageHandler {

    @Autowired
    SearchService searchService;

    @RabbitHandler
    public void handler(org.myslayers.search.amqp.PostMqIndexMessage message) {

        log.info("mq 收到一条消息: {}", message.toString());

        switch (message.getType()) {
            //类型:创建或更新,【CREATE_OR_UPDATE】
            case PostMqIndexMessage.CREATE_OR_UPDATE:
                searchService.createOrUpdateIndex(message);
                break;

            //类型:删除,【REMOVE】
            case PostMqIndexMessage.REMOVE:
                searchService.removeIndex(message);
                break;

            //其他类型:输出错误日志
            default:
                log.error("没找到对应的消息类型,请注意!! --》 {}", message.toString());
                break;
        }
    }
}
  • SearchServiceImpl.java :业务层实现,【创建/更新文章】、删除文章
@Slf4j
@Service
public class SearchServiceImpl implements SearchService {

    @Autowired
    PostRepository postRepository;

    @Autowired
    PostService postService;

    /**
     * ES:createOrUpdateIndex 创建/更新文章
     */
    @Override
    public void createOrUpdateIndex(PostMqIndexMessage message) {
        //根据message.getPostId() -> 查询文章
        PostVo postVo = postService.selectOnePost(
            new QueryWrapper<Post>().eq("p.id", message.getPostId())
        );

        //将postVo -> PostDocment
        PostDocment postDocment = new ModelMapper().map(postVo, PostDocment.class);
        postRepository.save(postDocment);

        log.info("es 索引更新成功! ---> {}", postDocment.toString());
    }

    /**
     * ES:removeIndex 删除文章
     */
    @Override
    public void removeIndex(PostMqIndexMessage message) {
        //根据message.getPostId() -> 删除文章
        postRepository.deleteById(message.getPostId());

        log.info("es 索引删除成功! ---> {}", message.toString());
    }
}

# 3.3 使用 RabbitMQ 保证 ES 随文章增删改查-实时更新

  • PostController.java :控制层,【消息同步,通知消息给 RabbitMQ,告知 ES【更新文章或添加文章】、【删除文章】】
@Controller
public class PostController extends BaseController {
    /**
     * 详情detail:【删除】文章
     */
    @ResponseBody
    @Transactional
    @PostMapping("/post/delete")
    public Result delete(Long id) {
        Post post = postService.getById(id);
        Assert.notNull(post, "该帖子已被删除");
        Assert.isTrue(post.getUserId().longValue() == getProfileId().longValue(), "无权限删除此文章!");

        // 删除-该篇文章【该篇文章】-Post
        postService.removeById(id);
        // 删除-我的消息【收到消息】-UserMessage中的post_id
        messageService.removeByMap(MapUtil.of("post_id", id));
        // 删除-用户中心【收藏的帖】-UserCollection中的post_id
        collectionService.removeByMap(MapUtil.of("post_id", id));

        // RabbitMQ:消息同步,通知消息给RabbitMQ,告知【更新或添加】
        // convertAndSend 【 交换机,路由密钥,发送的消息(操作的文章、操作的类型) 】
        amqpTemplate.convertAndSend(RabbitConfig.es_exchage, RabbitConfig.es_bind_key, new PostMqIndexMessage(post.getId(), PostMqIndexMessage.REMOVE));

        return Result.success().action("/user/index");
    }

    /**
     * 添加/编辑edit:【提交】文章
     */
    @ResponseBody
    @PostMapping("/post/submit")
    public Result submit(Post post) {
        // 使用ValidationUtil工具类,校验【输入是否错误】
        ValidationUtil.ValidResult validResult = ValidationUtil.validateBean(post);
        if (validResult.hasErrors()) {
            return Result.fail(validResult.getErrors());
        }

        // 在传入【req.setAttribute("post", post);】后,同一页面请求的数据,可以通过post.getId()查询到【id】
        // 如果id不存在,则为【添加-文章】
        if (post.getId() == null) {
            post.setUserId(getProfileId());
            post.setModified(new Date());
            post.setCreated(new Date());
            post.setCommentCount(0);
            post.setEditMode(null);
            post.setLevel(0);
            post.setRecommend(false);
            post.setViewCount(0);
            post.setVoteDown(0);
            post.setVoteUp(0);
            postService.save(post);
        } else {
            // 如果id存在,则为【更新-文章】
            Post tempPost = postService.getById(post.getId());
            Assert.isTrue(tempPost.getUserId().longValue() == getProfileId().longValue(), "无权限编辑此文章!");
            tempPost.setTitle(post.getTitle());
            tempPost.setContent(post.getContent());
            tempPost.setCategoryId(post.getCategoryId());
            postService.updateById(tempPost);
        }

        // RabbitMQ:消息同步,通知消息给RabbitMQ,告知【更新或添加】
        // convertAndSend 【 交换机,路由密钥,发送的消息(操作的文章、操作的类型) 】
        amqpTemplate.convertAndSend(RabbitConfig.es_exchage, RabbitConfig.es_bind_key, new PostMqIndexMessage(post.getId(), PostMqIndexMessage.CREATE_OR_UPDATE));

        // 无论id是否存在,两类情况都会 retern 跳转到 /post/${id}
        return Result.success().action("/post/" + post.getId());
    }
}
Last Updated: 6/6/2024, 7:53:07 AM