Commit 068e37b3 by yuwei

项目初始化

parent eb446951
...@@ -3,24 +3,27 @@ package cn.datax.common.rabbitmq.config; ...@@ -3,24 +3,27 @@ package cn.datax.common.rabbitmq.config;
public class RabbitMqConstant { public class RabbitMqConstant {
/** /**
* RabbitMQ的FANOUT_EXCHANG交换机类型的名称 * FANOUT类型的交换机:api发布与撤销
*/ */
public static final String FANOUT_EXCHANGE_API_RELEASE_NAME = "fanout.exchange.api.release.name"; public static final String FANOUT_EXCHANGE_API = "fanout.exchange.api";
/** /**
* RabbitMQ的FANOUT_EXCHANG交换机类型的队列API发布的名称 * FANOUT类型的队列:api发布与撤销
* 消费者1
*/ */
public static final String FANOUT_EXCHANGE_QUEUE_TOPIC_API_RELEASE1 = "fanout.api.release1"; public static final String FANOUT_API_QUEUE = "fanout.api.queue";
/** /**
* RabbitMQ的FANOUT_EXCHANG交换机类型的名称 * TOPIC类型的交换机:工作流
*/ */
public static final String FANOUT_EXCHANGE_API_CANCEL_NAME = "fanout.exchange.api.cancel.name"; public static final String TOPIC_EXCHANGE_WORKFLOW = "topic.exchange.workflow";
/** /**
* RabbitMQ的FANOUT_EXCHANG交换机类型的队列API注销的名称 * TOPIC类型的队列:工作流
* 消费者1
*/ */
public static final String FANOUT_EXCHANGE_QUEUE_TOPIC_API_CANCEL1 = "fanout.api.cancel1"; public static final String TOPIC_WORKFLOW_QUEUE = "topic.workflow.queue";
/**
* TOPIC类型的路由键:工作流 {}占位符替换
*/
public static final String TOPIC_WORKFLOW_KEY = "topic.workflow.key.{}";
} }
...@@ -11,55 +11,40 @@ import org.springframework.amqp.rabbit.annotation.Queue; ...@@ -11,55 +11,40 @@ import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.context.annotation.Configuration;
import java.util.Map;
@Slf4j @Slf4j
@Component @Configuration
public class RabbitMqListenerConfig { public class RabbitMqListenerConfig {
@Autowired @Autowired
private QueueHandlerService queueHandlerService; private QueueHandlerService queueHandlerService;
private static String HANDLER_RELEASE = "1";
private static String HANDLER_CANCEL = "2";
/** /**
* Fanout 交换机 * api发布与撤销
* 消费注册 * @param map type 1:发布 2:撤销
* @param channel
* @param message
* @return * @return
* @throws Exception
*/ */
@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(name = RabbitMqConstant.FANOUT_EXCHANGE_API_RELEASE_NAME, type = "fanout", durable = "true", autoDelete = "false"), @RabbitListener(bindings = @QueueBinding(exchange = @Exchange(name = RabbitMqConstant.FANOUT_EXCHANGE_API, type = "fanout", durable = "true", autoDelete = "false"),
value = @Queue(value = RabbitMqConstant.FANOUT_EXCHANGE_QUEUE_TOPIC_API_RELEASE1, durable = "true", exclusive = "false", autoDelete = "false"))) value = @Queue(value = RabbitMqConstant.FANOUT_API_QUEUE, durable = "true", exclusive = "false", autoDelete = "false")))
public String fanoutQueueRelease(String id, Channel channel, Message message) throws Exception { public String fanoutQueueRelease(Map map, Channel channel, Message message) throws Exception {
try { try {
log.info("fanoutQueueRelease接收到了:{}", id); String id = (String) map.get("id");
String type = (String) map.get("type");
log.info("fanoutQueueRelease接收到了:{},{}", id, type);
if (HANDLER_RELEASE.equals(type)) {
queueHandlerService.handlerRelease(id); queueHandlerService.handlerRelease(id);
// 手动确认 } else if (HANDLER_CANCEL.equals(type)) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
return id;
}catch (Exception e){
log.error("全局异常信息ex={}, StackTrace={}", e.getMessage(), ThrowableUtil.getStackTrace(e));
if (message.getMessageProperties().getRedelivered()){
log.error("消息已处理,请勿重复处理!");
// 拒绝消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}else {
//记录日志
log.error("消息消费失败处理:{}",e.getMessage());
//第一个参数为消息的index,第二个参数是是否批量处理,第三个参数为是否让被拒绝的消息重新入队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
return null;
}
/**
* Fanout 交换机
* 消费注销
*/
@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(name = RabbitMqConstant.FANOUT_EXCHANGE_API_CANCEL_NAME, type = "fanout", durable = "true", autoDelete = "false"),
value = @Queue(value = RabbitMqConstant.FANOUT_EXCHANGE_QUEUE_TOPIC_API_CANCEL1, durable = "true", exclusive = "false", autoDelete = "false")))
public String fanoutQueueCancel(String id, Channel channel, Message message) throws Exception {
try {
log.info("fanoutQueueCancel接收到了:{}", id);
queueHandlerService.handlerCancel(id); queueHandlerService.handlerCancel(id);
}
// 手动确认 // 手动确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
return id; return id;
......
...@@ -363,7 +363,7 @@ public class DataApiServiceImpl extends BaseServiceImpl<DataApiDao, DataApiEntit ...@@ -363,7 +363,7 @@ public class DataApiServiceImpl extends BaseServiceImpl<DataApiDao, DataApiEntit
@Override @Override
public void releaseDataApi(String id) { public void releaseDataApi(String id) {
String obj = (String) Optional.ofNullable(rabbitTemplate.convertSendAndReceive(RabbitMqConstant.FANOUT_EXCHANGE_API_RELEASE_NAME, "", id)).orElse(""); String obj = (String) Optional.ofNullable(rabbitTemplate.convertSendAndReceive(RabbitMqConstant.FANOUT_API_QUEUE, "", id)).orElse("");
if (StrUtil.isNotBlank(obj)) { if (StrUtil.isNotBlank(obj)) {
DataApiEntity dataApiEntity = new DataApiEntity(); DataApiEntity dataApiEntity = new DataApiEntity();
dataApiEntity.setId(id); dataApiEntity.setId(id);
...@@ -374,7 +374,7 @@ public class DataApiServiceImpl extends BaseServiceImpl<DataApiDao, DataApiEntit ...@@ -374,7 +374,7 @@ public class DataApiServiceImpl extends BaseServiceImpl<DataApiDao, DataApiEntit
@Override @Override
public void cancelDataApi(String id) { public void cancelDataApi(String id) {
String obj = (String) Optional.ofNullable(rabbitTemplate.convertSendAndReceive(RabbitMqConstant.FANOUT_EXCHANGE_API_CANCEL_NAME, "", id)).orElse(""); String obj = (String) Optional.ofNullable(rabbitTemplate.convertSendAndReceive(RabbitMqConstant.FANOUT_API_QUEUE, "", id)).orElse("");
if (StrUtil.isNotBlank(obj)) { if (StrUtil.isNotBlank(obj)) {
DataApiEntity dataApiEntity = new DataApiEntity(); DataApiEntity dataApiEntity = new DataApiEntity();
dataApiEntity.setId(id); dataApiEntity.setId(id);
......
...@@ -79,6 +79,11 @@ ...@@ -79,6 +79,11 @@
<artifactId>workflow-service-api</artifactId> <artifactId>workflow-service-api</artifactId>
<version>2.0.0</version> <version>2.0.0</version>
</dependency> </dependency>
<dependency>
<groupId>cn.datax</groupId>
<artifactId>datax-common-rabbitmq</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies> </dependencies>
<build> <build>
......
package cn.datax.service.data.masterdata.config;
import cn.datax.common.rabbitmq.config.RabbitMqConstant;
import cn.datax.common.utils.ThrowableUtil;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class RabbitMqListenerConfig {
/**
* 消费工作流 业务编码 6011
* @param id
* @param channel
* @param message
* @return
* @throws Exception
*/
@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(name = RabbitMqConstant.TOPIC_EXCHANGE_WORKFLOW, type = "topic", durable = "true", autoDelete = "false"),
key = {""},
value = @Queue(value = RabbitMqConstant.TOPIC_WORKFLOW_QUEUE, durable = "true", exclusive = "false", autoDelete = "false")))
public String fanoutQueueRelease(String id, Channel channel, Message message) throws Exception {
try {
log.info("fanoutQueueRelease接收到了:{}", id);
// 手动确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
return id;
}catch (Exception e){
log.error("全局异常信息ex={}, StackTrace={}", e.getMessage(), ThrowableUtil.getStackTrace(e));
if (message.getMessageProperties().getRedelivered()){
log.error("消息已处理,请勿重复处理!");
// 拒绝消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}else {
//记录日志
log.error("消息消费失败处理:{}",e.getMessage());
//第一个参数为消息的index,第二个参数是是否批量处理,第三个参数为是否让被拒绝的消息重新入队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
return null;
}
}
...@@ -74,6 +74,11 @@ ...@@ -74,6 +74,11 @@
<artifactId>flowable-spring-boot-starter-process</artifactId> <artifactId>flowable-spring-boot-starter-process</artifactId>
<version>${flowable.version}</version> <version>${flowable.version}</version>
</dependency> </dependency>
<dependency>
<groupId>cn.datax</groupId>
<artifactId>datax-common-rabbitmq</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies> </dependencies>
<build> <build>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment