Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
D
datax-cloud
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
黄营
datax-cloud
Commits
7262f497
Commit
7262f497
authored
Sep 20, 2020
by
yuwei
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
项目初始化
parent
48a21542
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
82 additions
and
50 deletions
+82
-50
SecurityUtil.java
...ore/src/main/java/cn/datax/common/utils/SecurityUtil.java
+3
-3
DataMetaObjectHandler.java
...cn/datax/common/mybatis/config/DataMetaObjectHandler.java
+4
-1
RabbitMqListenerConfig.java
...ce/data/market/mapping/config/RabbitMqListenerConfig.java
+29
-8
RabbitConfig.java
...ava/cn/datax/service/data/market/config/RabbitConfig.java
+32
-0
DataApiServiceImpl.java
.../service/data/market/service/impl/DataApiServiceImpl.java
+14
-38
No files found.
datax-common/datax-common-core/src/main/java/cn/datax/common/utils/SecurityUtil.java
View file @
7262f497
...
...
@@ -37,7 +37,7 @@ public class SecurityUtil {
if
(
user
!=
null
){
return
user
.
getId
();
}
return
null
;
return
""
;
}
/**
...
...
@@ -50,7 +50,7 @@ public class SecurityUtil {
if
(
user
!=
null
){
return
user
.
getDept
();
}
return
null
;
return
""
;
}
/**
...
...
@@ -63,7 +63,7 @@ public class SecurityUtil {
if
(
user
!=
null
){
return
user
.
getUsername
();
}
return
null
;
return
""
;
}
/**
...
...
datax-common/datax-common-mybatis/src/main/java/cn/datax/common/mybatis/config/DataMetaObjectHandler.java
View file @
7262f497
...
...
@@ -24,7 +24,10 @@ public class DataMetaObjectHandler implements MetaObjectHandler {
if
(
bolCreateDept
)
{
this
.
strictInsertFill
(
metaObject
,
"createDept"
,
String
.
class
,
getUserDeptId
());
}
this
.
strictInsertFill
(
metaObject
,
"flowStatus"
,
String
.
class
,
DataConstant
.
AuditState
.
WAIT
);
boolean
bolFlowStatus
=
metaObject
.
hasSetter
(
"flowStatus"
);
if
(
bolFlowStatus
)
{
this
.
strictInsertFill
(
metaObject
,
"flowStatus"
,
String
.
class
,
DataConstant
.
AuditState
.
WAIT
);
}
}
@Override
...
...
datax-modules/data-market-service-parent/data-market-service-mapping/src/main/java/cn/datax/service/data/market/mapping/config/RabbitMqListenerConfig.java
View file @
7262f497
...
...
@@ -23,20 +23,31 @@ public class RabbitMqListenerConfig {
/**
* Fanout 交换机
* 消费注册
* @return
*/
@RabbitListener
(
bindings
=
@QueueBinding
(
exchange
=
@Exchange
(
name
=
RabbitMqConstant
.
FANOUT_EXCHANGE_API_RELEASE_NAME
,
type
=
"fanout"
,
durable
=
"true"
,
autoDelete
=
"false"
),
value
=
@Queue
(
value
=
RabbitMqConstant
.
FANOUT_EXCHANGE_QUEUE_TOPIC_API_RELEASE1
,
durable
=
"true"
,
exclusive
=
"false"
,
autoDelete
=
"false"
)))
public
void
fanoutQueueRelease
(
String
id
,
Channel
channel
,
Message
message
)
throws
Exception
{
public
String
fanoutQueueRelease
(
String
id
,
Channel
channel
,
Message
message
)
throws
Exception
{
try
{
log
.
info
(
"fanoutQueueRelease接收到了:{}"
,
id
);
queueHandlerService
.
handlerRelease
(
id
);
// 手动确认
channel
.
basicAck
(
message
.
getMessageProperties
().
getDeliveryTag
(),
true
);
channel
.
basicAck
(
message
.
getMessageProperties
().
getDeliveryTag
(),
false
);
return
id
;
}
catch
(
Exception
e
){
log
.
error
(
"全局异常信息ex={}, StackTrace={}"
,
e
.
getMessage
(),
ThrowableUtil
.
getStackTrace
(
e
));
// 拒绝策略
channel
.
basicReject
(
message
.
getMessageProperties
().
getDeliveryTag
(),
false
);
if
(
message
.
getMessageProperties
().
getRedelivered
()){
log
.
error
(
"消息已处理,请勿重复处理!"
);
// 拒绝消息
channel
.
basicReject
(
message
.
getMessageProperties
().
getDeliveryTag
(),
false
);
}
else
{
//记录日志
log
.
error
(
"消息消费失败处理:{}"
,
e
.
getMessage
());
//第一个参数为消息的index,第二个参数是是否批量处理,第三个参数为是否让被拒绝的消息重新入队列
channel
.
basicNack
(
message
.
getMessageProperties
().
getDeliveryTag
(),
false
,
false
);
}
}
return
null
;
}
/**
...
...
@@ -45,16 +56,26 @@ public class RabbitMqListenerConfig {
*/
@RabbitListener
(
bindings
=
@QueueBinding
(
exchange
=
@Exchange
(
name
=
RabbitMqConstant
.
FANOUT_EXCHANGE_API_CANCEL_NAME
,
type
=
"fanout"
,
durable
=
"true"
,
autoDelete
=
"false"
),
value
=
@Queue
(
value
=
RabbitMqConstant
.
FANOUT_EXCHANGE_QUEUE_TOPIC_API_CANCEL1
,
durable
=
"true"
,
exclusive
=
"false"
,
autoDelete
=
"false"
)))
public
void
fanoutQueueCancel
(
String
id
,
Channel
channel
,
Message
message
)
throws
Exception
{
public
String
fanoutQueueCancel
(
String
id
,
Channel
channel
,
Message
message
)
throws
Exception
{
try
{
log
.
info
(
"fanoutQueueCancel接收到了:{}"
,
id
);
queueHandlerService
.
handlerCancel
(
id
);
// 手动确认
channel
.
basicAck
(
message
.
getMessageProperties
().
getDeliveryTag
(),
true
);
channel
.
basicAck
(
message
.
getMessageProperties
().
getDeliveryTag
(),
false
);
return
id
;
}
catch
(
Exception
e
){
log
.
error
(
"全局异常信息ex={}, StackTrace={}"
,
e
.
getMessage
(),
ThrowableUtil
.
getStackTrace
(
e
));
// 拒绝策略
channel
.
basicReject
(
message
.
getMessageProperties
().
getDeliveryTag
(),
false
);
if
(
message
.
getMessageProperties
().
getRedelivered
()){
log
.
error
(
"消息已处理,请勿重复处理!"
);
// 拒绝消息
channel
.
basicReject
(
message
.
getMessageProperties
().
getDeliveryTag
(),
false
);
}
else
{
//记录日志
log
.
error
(
"消息消费失败处理:{}"
,
e
.
getMessage
());
//第一个参数为消息的index,第二个参数是是否批量处理,第三个参数为是否让被拒绝的消息重新入队列
channel
.
basicNack
(
message
.
getMessageProperties
().
getDeliveryTag
(),
false
,
false
);
}
}
return
null
;
}
}
datax-modules/data-market-service-parent/data-market-service/src/main/java/cn/datax/service/data/market/config/RabbitConfig.java
0 → 100644
View file @
7262f497
package
cn
.
datax
.
service
.
data
.
market
.
config
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.amqp.rabbit.connection.CachingConnectionFactory
;
import
org.springframework.amqp.rabbit.core.RabbitTemplate
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
@Slf4j
@Configuration
public
class
RabbitConfig
{
@Bean
public
RabbitTemplate
rabbitTemplate
(
CachingConnectionFactory
connectionFactory
)
{
RabbitTemplate
rabbitTemplate
=
new
RabbitTemplate
(
connectionFactory
);
// 消息是否成功发送到Exchange
rabbitTemplate
.
setConfirmCallback
((
correlationData
,
ack
,
cause
)
->
{
if
(
ack
)
{
log
.
info
(
"消息成功发送到Exchange"
);
}
else
{
log
.
info
(
"消息发送到Exchange失败, {}, cause: {}"
,
correlationData
,
cause
);
}
});
// 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
rabbitTemplate
.
setMandatory
(
true
);
// 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法
rabbitTemplate
.
setReturnCallback
((
message
,
replyCode
,
replyText
,
exchange
,
routingKey
)
->
{
log
.
info
(
"消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}"
,
exchange
,
routingKey
,
replyCode
,
replyText
,
message
);
});
return
rabbitTemplate
;
}
}
datax-modules/data-market-service-parent/data-market-service/src/main/java/cn/datax/service/data/market/service/impl/DataApiServiceImpl.java
View file @
7262f497
...
...
@@ -363,48 +363,24 @@ public class DataApiServiceImpl extends BaseServiceImpl<DataApiDao, DataApiEntit
@Override
public
void
releaseDataApi
(
String
id
)
{
// 消息是否成功发送到Exchange
rabbitTemplate
.
setConfirmCallback
((
correlationData
,
ack
,
cause
)
->
{
if
(
ack
)
{
log
.
info
(
"消息成功发送到Exchange"
);
DataApiEntity
dataApiEntity
=
new
DataApiEntity
();
dataApiEntity
.
setId
(
id
);
dataApiEntity
.
setStatus
(
DataConstant
.
ApiState
.
RELEASE
.
getKey
());
dataApiDao
.
updateById
(
dataApiEntity
);
}
else
{
log
.
error
(
"消息发送到Exchange失败, {}, cause: {}"
,
correlationData
,
cause
);
}
});
// 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
rabbitTemplate
.
setMandatory
(
true
);
// 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法
rabbitTemplate
.
setReturnCallback
((
message
,
replyCode
,
replyText
,
exchange
,
routingKey
)
->
{
log
.
error
(
"消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}"
,
exchange
,
routingKey
,
replyCode
,
replyText
,
message
);
});
rabbitTemplate
.
convertAndSend
(
RabbitMqConstant
.
FANOUT_EXCHANGE_API_RELEASE_NAME
,
""
,
id
);
String
obj
=
(
String
)
Optional
.
ofNullable
(
rabbitTemplate
.
convertSendAndReceive
(
RabbitMqConstant
.
FANOUT_EXCHANGE_API_RELEASE_NAME
,
""
,
id
)).
orElse
(
""
);
if
(
StrUtil
.
isNotBlank
(
obj
))
{
DataApiEntity
dataApiEntity
=
new
DataApiEntity
();
dataApiEntity
.
setId
(
id
);
dataApiEntity
.
setStatus
(
DataConstant
.
ApiState
.
RELEASE
.
getKey
());
dataApiDao
.
updateById
(
dataApiEntity
);
}
}
@Override
public
void
cancelDataApi
(
String
id
)
{
// 消息是否成功发送到Exchange
rabbitTemplate
.
setConfirmCallback
((
correlationData
,
ack
,
cause
)
->
{
if
(
ack
)
{
log
.
info
(
"消息成功发送到Exchange"
);
DataApiEntity
dataApiEntity
=
new
DataApiEntity
();
dataApiEntity
.
setId
(
id
);
dataApiEntity
.
setStatus
(
DataConstant
.
ApiState
.
CANCEL
.
getKey
());
dataApiDao
.
updateById
(
dataApiEntity
);
}
else
{
log
.
error
(
"消息发送到Exchange失败, {}, cause: {}"
,
correlationData
,
cause
);
}
});
// 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
rabbitTemplate
.
setMandatory
(
true
);
// 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法
rabbitTemplate
.
setReturnCallback
((
message
,
replyCode
,
replyText
,
exchange
,
routingKey
)
->
{
log
.
error
(
"消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}"
,
exchange
,
routingKey
,
replyCode
,
replyText
,
message
);
});
rabbitTemplate
.
convertAndSend
(
RabbitMqConstant
.
FANOUT_EXCHANGE_API_CANCEL_NAME
,
""
,
id
);
String
obj
=
(
String
)
Optional
.
ofNullable
(
rabbitTemplate
.
convertSendAndReceive
(
RabbitMqConstant
.
FANOUT_EXCHANGE_API_CANCEL_NAME
,
""
,
id
)).
orElse
(
""
);
if
(
StrUtil
.
isNotBlank
(
obj
))
{
DataApiEntity
dataApiEntity
=
new
DataApiEntity
();
dataApiEntity
.
setId
(
id
);
dataApiEntity
.
setStatus
(
DataConstant
.
ApiState
.
CANCEL
.
getKey
());
dataApiDao
.
updateById
(
dataApiEntity
);
}
}
@Override
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment