Commit 88d43cba by hy

add necessary api for console end-point

parent a3b41ab1
...@@ -92,4 +92,44 @@ public class WorkerController { ...@@ -92,4 +92,44 @@ public class WorkerController {
} }
return Result.ok(false); return Result.ok(false);
} }
@PostMapping("/startTask")
public Result<?> startTask(String group, String worker, String taskId) {
WorkerNode workerNode = workerRegistry.getWorker(group, worker);
if (workerNode != null) {
WorkerBizClient client = workerNode.getClient();
return client.startTask(taskId);
}
return Result.ok(false);
}
@PostMapping("/stopTask")
public Result<?> stopTask(String group, String worker, String taskId) {
WorkerNode workerNode = workerRegistry.getWorker(group, worker);
if (workerNode != null) {
WorkerBizClient client = workerNode.getClient();
return client.stopTask(taskId);
}
return Result.ok(false);
}
@PostMapping("/pauseTask")
public Result<?> pauseTask(String group, String worker, String taskId) {
WorkerNode workerNode = workerRegistry.getWorker(group, worker);
if (workerNode != null) {
WorkerBizClient client = workerNode.getClient();
return client.pauseTask(taskId);
}
return Result.ok(false);
}
@PostMapping("/resumeTask")
public Result<?> resumeTask(String group, String worker, String taskId) {
WorkerNode workerNode = workerRegistry.getWorker(group, worker);
if (workerNode != null) {
WorkerBizClient client = workerNode.getClient();
return client.resumeTask(taskId);
}
return Result.ok(false);
}
} }
...@@ -38,6 +38,12 @@ public class WorkerApiRequestHandler implements RequestHandler { ...@@ -38,6 +38,12 @@ public class WorkerApiRequestHandler implements RequestHandler {
case "/stopTask": case "/stopTask":
result = workerBizApi.stopTask(request.getContent()); result = workerBizApi.stopTask(request.getContent());
break; break;
case "/pauseTask":
result = workerBizApi.pauseTask(request.getContent());
break;
case "/resumeTask":
result = workerBizApi.resumeTask(request.getContent());
break;
case "/getProcessors": case "/getProcessors":
result = workerBizApi.getProcessors(); result = workerBizApi.getProcessors();
break; break;
......
...@@ -20,6 +20,10 @@ public interface WorkerBizApi { ...@@ -20,6 +20,10 @@ public interface WorkerBizApi {
Result<Boolean> stopTask(String taskId); Result<Boolean> stopTask(String taskId);
Result<Boolean> pauseTask(String taskId);
Result<Boolean> resumeTask(String taskId);
Result<List<Task>> getTasks(); Result<List<Task>> getTasks();
Result<List<ProcessorVo>> getProcessors(); Result<List<ProcessorVo>> getProcessors();
......
...@@ -68,6 +68,28 @@ public class WorkerBizClient implements WorkerBizApi { ...@@ -68,6 +68,28 @@ public class WorkerBizClient implements WorkerBizApi {
} }
@Override @Override
public Result<Boolean> pauseTask(String taskId) {
try {
String result = HttpClient.post(workerUrl + "/pauseTask", taskId);
return JsonUtils.fromJson(result, new TypeToken<Result<Boolean>>() {
});
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public Result<Boolean> resumeTask(String taskId) {
try {
String result = HttpClient.post(workerUrl + "/resumeTask", taskId);
return JsonUtils.fromJson(result, new TypeToken<Result<Boolean>>() {
});
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public Result<List<Task>> getTasks() { public Result<List<Task>> getTasks() {
try { try {
String result = HttpClient.get(workerUrl + "/getTasks"); String result = HttpClient.get(workerUrl + "/getTasks");
......
...@@ -91,6 +91,26 @@ public class WorkerBizImpl implements WorkerBizApi { ...@@ -91,6 +91,26 @@ public class WorkerBizImpl implements WorkerBizApi {
} }
@Override @Override
public Result<Boolean> pauseTask(String taskId) {
try {
taskManager.pauseTask(taskId);
return Result.ok(true);
} catch (Exception e) {
return Result.ok(false);
}
}
@Override
public Result<Boolean> resumeTask(String taskId) {
try {
taskManager.resumeTask(taskId);
return Result.ok(true);
} catch (Exception e) {
return Result.ok(false);
}
}
@Override
public Result<List<Task>> getTasks() { public Result<List<Task>> getTasks() {
return Result.ok(taskManager.getTasks()); return Result.ok(taskManager.getTasks());
} }
......
...@@ -21,6 +21,7 @@ public class TaskManager { ...@@ -21,6 +21,7 @@ public class TaskManager {
private final Map<String, TaskState> taskStateMap = new HashMap<>(); private final Map<String, TaskState> taskStateMap = new HashMap<>();
private final Map<String, List<TaskThread>> taskExecutors = new HashMap<>(); private final Map<String, List<TaskThread>> taskExecutors = new HashMap<>();
private final Map<String, Task> tasks = new ConcurrentHashMap<>(); private final Map<String, Task> tasks = new ConcurrentHashMap<>();
private volatile List<String> taskIds = new ArrayList<>();
public TaskManager() { public TaskManager() {
...@@ -52,6 +53,10 @@ public class TaskManager { ...@@ -52,6 +53,10 @@ public class TaskManager {
} }
task.setParallelism(parallelism); task.setParallelism(parallelism);
tasks.put(taskId, task); tasks.put(taskId, task);
List<String> old = this.taskIds;
List<String> updatedTaskIds = new ArrayList<>(old);
updatedTaskIds.add(taskId);
taskIds = updatedTaskIds;
} }
private ProcessorPipeline buildPipeline(PipelineDef def) { private ProcessorPipeline buildPipeline(PipelineDef def) {
...@@ -89,14 +94,14 @@ public class TaskManager { ...@@ -89,14 +94,14 @@ public class TaskManager {
ts.add(t); ts.add(t);
} }
taskExecutors.put(taskId, ts); taskExecutors.put(taskId, ts);
taskStateMap.put(taskId, TaskState.STARTED); taskStateMap.put(taskId, TaskState.RUNNING);
updateState(taskId, TaskState.STARTED); updateState(taskId, TaskState.RUNNING);
} }
} }
public synchronized void pauseTask(String taskId) { public synchronized void pauseTask(String taskId) {
TaskState taskState = taskStateMap.get(taskId); TaskState taskState = taskStateMap.get(taskId);
if (taskState == TaskState.STARTED) { if (taskState == TaskState.RUNNING) {
taskStateMap.put(taskId, TaskState.PAUSED); taskStateMap.put(taskId, TaskState.PAUSED);
taskExecutors.get(taskId).forEach(TaskThread::pause); taskExecutors.get(taskId).forEach(TaskThread::pause);
updateState(taskId, TaskState.PAUSED); updateState(taskId, TaskState.PAUSED);
...@@ -106,15 +111,15 @@ public class TaskManager { ...@@ -106,15 +111,15 @@ public class TaskManager {
public synchronized void resumeTask(String taskId) { public synchronized void resumeTask(String taskId) {
TaskState taskState = taskStateMap.get(taskId); TaskState taskState = taskStateMap.get(taskId);
if (taskState == TaskState.PAUSED) { if (taskState == TaskState.PAUSED) {
taskStateMap.put(taskId, TaskState.STARTED); taskStateMap.put(taskId, TaskState.RUNNING);
taskExecutors.get(taskId).forEach(TaskThread::proceed); taskExecutors.get(taskId).forEach(TaskThread::proceed);
updateState(taskId, TaskState.STARTED); updateState(taskId, TaskState.RUNNING);
} }
} }
public synchronized void terminateTask(String taskId) { public synchronized void terminateTask(String taskId) {
TaskState taskState = taskStateMap.get(taskId); TaskState taskState = taskStateMap.get(taskId);
if (taskState == TaskState.STARTED || taskState == TaskState.PAUSED) { if (taskState == TaskState.RUNNING || taskState == TaskState.PAUSED) {
taskStateMap.put(taskId, TaskState.TERMINATED); taskStateMap.put(taskId, TaskState.TERMINATED);
taskExecutors.get(taskId).forEach(TaskThread::terminate); taskExecutors.get(taskId).forEach(TaskThread::terminate);
updateState(taskId, TaskState.TERMINATED); updateState(taskId, TaskState.TERMINATED);
...@@ -126,7 +131,15 @@ public class TaskManager { ...@@ -126,7 +131,15 @@ public class TaskManager {
} }
public List<Task> getTasks() { public List<Task> getTasks() {
return new ArrayList<>(tasks.values()); ArrayList<Task> result = new ArrayList<>();
List<String> taskIds = this.taskIds;
for (String taskId : taskIds) {
Task task = tasks.get(taskId);
if (task != null) {
result.add(tasks.get(taskId));
}
}
return result;
} }
......
...@@ -3,7 +3,7 @@ package com.tbyf.dataadapter.task; ...@@ -3,7 +3,7 @@ package com.tbyf.dataadapter.task;
public enum TaskState { public enum TaskState {
NEW, NEW,
STARTED, RUNNING,
PAUSED, PAUSED,
TERMINATED TERMINATED
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment