Commit 84bc4b27 by hy

add necessary api for console end-point

parent 32e961b4
...@@ -32,6 +32,11 @@ ...@@ -32,6 +32,11 @@
<artifactId>lombok</artifactId> <artifactId>lombok</artifactId>
<optional>true</optional> <optional>true</optional>
</dependency> </dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.10.1</version>
</dependency>
<!--注册中心客户端--> <!--注册中心客户端-->
<dependency> <dependency>
<groupId>com.alibaba.cloud</groupId> <groupId>com.alibaba.cloud</groupId>
......
...@@ -112,4 +112,9 @@ public class PluginController { ...@@ -112,4 +112,9 @@ public class PluginController {
} }
return Result.ok(result); return Result.ok(result);
} }
@GetMapping("/plugins")
public Result<?> getPlugins() {
return Result.ok(processorManager.getPlugins());
}
} }
package com.tbyf.dataadapter.console.controlle; package com.tbyf.dataadapter.console.controlle;
import com.tbyf.dataadapter.api.impl.WorkerBizClient;
import com.tbyf.dataadapter.api.vo.Result; import com.tbyf.dataadapter.api.vo.Result;
import com.tbyf.dataadapter.console.model.dto.GroupDTO; import com.tbyf.dataadapter.console.model.dto.GroupDTO;
import com.tbyf.dataadapter.console.model.dto.TaskDefDTO;
import com.tbyf.dataadapter.registry.GroupNode; import com.tbyf.dataadapter.registry.GroupNode;
import com.tbyf.dataadapter.registry.WorkerNode;
import com.tbyf.dataadapter.registry.WorkerRegistry; import com.tbyf.dataadapter.registry.WorkerRegistry;
import com.tbyf.dataadapter.task.TaskDef;
import com.tbyf.dataadapter.task.processor.config.PipelineDef;
import com.tbyf.dataadapter.task.processor.config.ProcessorConfig;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList; import java.util.*;
import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@RestController @RestController
...@@ -23,11 +26,70 @@ public class WorkerController { ...@@ -23,11 +26,70 @@ public class WorkerController {
@GetMapping("/groups") @GetMapping("/groups")
public Result<?> getWorkerGroups() { public Result<?> getWorkerGroups() {
List<GroupNode> workerGroups = workerRegistry.getWorkerGroups(); List<GroupNode> workerGroups = workerRegistry.getWorkerGroups();
return Result.ok(workerGroups.stream().map(g->{ return Result.ok(workerGroups.stream().map(g -> {
GroupDTO dto = new GroupDTO(); GroupDTO dto = new GroupDTO();
dto.setName(g.getName()); dto.setName(g.getName());
dto.setWorkerNodes(new ArrayList<>(g.getWorkerNodes().values())); dto.setWorkerNodes(new ArrayList<>(g.getWorkerNodes().values()));
return dto; return dto;
}).collect(Collectors.toList())); }).collect(Collectors.toList()));
} }
@GetMapping("/tasks")
public Result<?> getTasks(String group, String worker) {
WorkerNode workerNode = workerRegistry.getWorker(group, worker);
if (workerNode != null) {
WorkerBizClient client = workerNode.getClient();
return client.getTasks();
}
return Result.ok(Collections.emptyList());
}
@GetMapping("/headProcessors")
public Result<?> getHeadProcessors(String group, String worker) {
WorkerNode workerNode = workerRegistry.getWorker(group, worker);
if (workerNode != null) {
WorkerBizClient client = workerNode.getClient();
return client.getHeadProcessors();
}
return Result.ok(Collections.emptyList());
}
@GetMapping("/processors")
public Result<?> getProcessors(String group, String worker) {
WorkerNode workerNode = workerRegistry.getWorker(group, worker);
if (workerNode != null) {
WorkerBizClient client = workerNode.getClient();
return client.getProcessors();
}
return Result.ok(Collections.emptyList());
}
@PostMapping("/installPlugin")
public Result<?> installPlugin(String group, String worker, String pluginId) {
WorkerNode workerNode = workerRegistry.getWorker(group, worker);
if (workerNode != null) {
WorkerBizClient client = workerNode.getClient();
return client.installPlugin(pluginId);
}
return Result.ok(false);
}
@PostMapping("/submitTask")
public Result<?> addTask(@RequestBody TaskDefDTO task) {
WorkerNode workerNode = workerRegistry.getWorker(task.getGroup(), task.getWorker());
if (workerNode != null) {
WorkerBizClient client = workerNode.getClient();
TaskDef taskDef = new TaskDef();
taskDef.setTaskId(UUID.randomUUID().toString());
taskDef.setDescription(task.getDescription());
// todo
taskDef.setParallelism(1);
PipelineDef pd = new PipelineDef();
pd.setHead(task.getHeadProcessor());
pd.setProcessors(task.getProcessors());
taskDef.setPipelineDef(pd);
return client.submitTask(taskDef);
}
return Result.ok(false);
}
} }
package com.tbyf.dataadapter.console.model.dto;
import com.tbyf.dataadapter.task.processor.config.ProcessorConfig;
import lombok.Data;
import java.util.List;
@Data
public class TaskDefDTO {
private String group;
private String worker;
private String description;
private ProcessorConfig headProcessor;
private List<ProcessorConfig> processors;
}
...@@ -101,7 +101,7 @@ public class WorkerBizClient implements WorkerBizApi { ...@@ -101,7 +101,7 @@ public class WorkerBizClient implements WorkerBizApi {
} }
public static void main(String[] args) { public static void main(String[] args) {
WorkerBizClient client = new WorkerBizClient("http://localhost:8081"); WorkerBizClient client = new WorkerBizClient("http://192.168.11.40:8084");
System.out.println(client.getHeadProcessors()); System.out.println(client.getHeadProcessors());
System.out.println(client.getProcessors()); System.out.println(client.getProcessors());
Result<Boolean> r = client.installPlugin("new-plugin-1.1-SNAPSHOT"); Result<Boolean> r = client.installPlugin("new-plugin-1.1-SNAPSHOT");
...@@ -132,7 +132,8 @@ public class WorkerBizClient implements WorkerBizApi { ...@@ -132,7 +132,8 @@ public class WorkerBizClient implements WorkerBizApi {
taskDef.setPipelineDef(def); taskDef.setPipelineDef(def);
System.out.println(client.submitTask(taskDef)); System.out.println(client.submitTask(taskDef));
System.out.println(client.startTask("02")); System.out.println(client.startTask("02"));
Result<List<Task>> tasks = client.getTasks();
client.stopTask("02"); System.out.println(tasks);
//client.stopTask("02");
} }
} }
...@@ -10,4 +10,5 @@ public interface WorkerRegistry { ...@@ -10,4 +10,5 @@ public interface WorkerRegistry {
List<GroupNode> getWorkerGroups(); List<GroupNode> getWorkerGroups();
WorkerNode getWorker(String group, String worker);
} }
...@@ -109,4 +109,9 @@ public class ZookeeperWorkerRegistry implements WorkerRegistry { ...@@ -109,4 +109,9 @@ public class ZookeeperWorkerRegistry implements WorkerRegistry {
return new ArrayList<>(groupNodes.values()); return new ArrayList<>(groupNodes.values());
} }
@Override
public WorkerNode getWorker(String group, String worker) {
return groupNodes.get(group).getWorkerNodes().get(worker);
}
} }
...@@ -115,4 +115,10 @@ public class ProcessorManager { ...@@ -115,4 +115,10 @@ public class ProcessorManager {
public Path getPluginPath(String pluginId) { public Path getPluginPath(String pluginId) {
return pluginManager.getPlugin(pluginId).getPluginPath(); return pluginManager.getPlugin(pluginId).getPluginPath();
} }
public List<String> getPlugins() {
return pluginManager.getPlugins().stream().map(PluginWrapper::getPluginId).collect(Collectors.toList());
}
} }
...@@ -6,6 +6,7 @@ import okhttp3.logging.HttpLoggingInterceptor; ...@@ -6,6 +6,7 @@ import okhttp3.logging.HttpLoggingInterceptor;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.concurrent.TimeUnit;
public abstract class HttpClient { public abstract class HttpClient {
...@@ -18,6 +19,7 @@ public abstract class HttpClient { ...@@ -18,6 +19,7 @@ public abstract class HttpClient {
HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor(); HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor();
loggingInterceptor.level(HttpLoggingInterceptor.Level.BASIC); loggingInterceptor.level(HttpLoggingInterceptor.Level.BASIC);
client = new OkHttpClient.Builder() client = new OkHttpClient.Builder()
.readTimeout(30, TimeUnit.SECONDS)
.addInterceptor(loggingInterceptor) .addInterceptor(loggingInterceptor)
.build(); .build();
} }
......
...@@ -3,4 +3,4 @@ group=biz03 ...@@ -3,4 +3,4 @@ group=biz03
registryAddress=127.0.0.1:2181 registryAddress=127.0.0.1:2181
port=8084 port=8084
pluginsRoot=plugin01 pluginsRoot=plugin01
pluginRepositoryUrl=http://localhost:8082/plugin/download pluginRepositoryUrl=http://localhost:18080/plugin/download
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment