Commit d39055e8 by hy

project init

parents
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr
### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
### Mac OS ###
.DS_Store
.idea/
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.tbyf.dataadapter</groupId>
<artifactId>dataadapter</artifactId>
<version>3.0-SNAPSHOT</version>
</parent>
<artifactId>dataadapter-core</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>
\ No newline at end of file
package com.tbyf.dataadapter;
public class TimeoutException extends RuntimeException {
public TimeoutException() {
super();
}
public TimeoutException(String message) {
super(message);
}
}
package com.tbyf.dataadapter;
import com.tbyf.dataadapter.api.impl.WorkerBizImpl;
import com.tbyf.dataadapter.http.HttpRequest;
import com.tbyf.dataadapter.http.HttpResponse;
import com.tbyf.dataadapter.http.RequestHandler;
import com.tbyf.dataadapter.http.SimpleHttpServer;
import com.tbyf.dataadapter.registry.WorkerRegistry;
import com.tbyf.dataadapter.registry.support.ZookeeperWorkerRegistry;
import com.tbyf.dataadapter.util.NetUtils;
import lombok.Getter;
import lombok.Setter;
import java.net.UnknownHostException;
import java.util.UUID;
public class Worker {
private final WorkerProperties props;
@Getter
private final String name;
@Getter
private final String group;
@Getter
private final String embeddedServerAddress;
private final WorkerRegistry workerRegistry;
@Setter
private RequestHandler requestHandler;
public Worker(WorkerProperties props) {
props.validate();
this.props = props;
this.name = props.getName();
this.group = props.getGroup();
this.workerRegistry = new ZookeeperWorkerRegistry(props.getRegistryAddress());
try {
this.embeddedServerAddress = NetUtils.getLocalHost() + ":" + props.getPort();
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
}
public void start() {
RequestHandler requestHandler = this.requestHandler;
if (requestHandler == null) {
throw new IllegalStateException("requestHandler not set");
}
workerRegistry.registerWorker(this);
SimpleHttpServer server = new SimpleHttpServer(props.getPort(), requestHandler);
server.start();
}
public static void main(String[] args) {
WorkerProperties props = new WorkerProperties();
Worker worker = new Worker(props);
WorkerApiRequestHandler handler = new WorkerApiRequestHandler(new WorkerBizImpl("plugins001", "http://localhost:8082/plugin/download"));
worker.setRequestHandler(handler);
worker.start();
}
}
package com.tbyf.dataadapter;
import com.tbyf.dataadapter.api.WorkerBizApi;
import com.tbyf.dataadapter.api.vo.Result;
import com.tbyf.dataadapter.http.HttpRequest;
import com.tbyf.dataadapter.http.HttpResponse;
import com.tbyf.dataadapter.http.RequestHandler;
import com.tbyf.dataadapter.task.TaskDef;
import com.tbyf.dataadapter.util.JsonUtils;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class WorkerApiRequestHandler implements RequestHandler {
private final WorkerBizApi workerBizApi;
public WorkerApiRequestHandler(WorkerBizApi workerBizApi) {
this.workerBizApi = workerBizApi;
}
@Override
public HttpResponse handle(HttpRequest request) throws Exception {
Object result = null;
switch (request.getUri()) {
case "/installPlugin":
result = workerBizApi.installPlugin(request.getContent());
break;
case "/submitTask":
TaskDef def = JsonUtils.fromJson(request.getContent(), TaskDef.class);
result = workerBizApi.submitTask(def);
break;
case "/startTask":
result = workerBizApi.startTask(request.getContent());
break;
case "/stopTask":
result = workerBizApi.stopTask(request.getContent());
break;
case "/getProcessors":
result = workerBizApi.getProcessors();
break;
case "/getHeadProcessors":
result = workerBizApi.getHeadProcessors();
break;
case "/getTasks":
result = workerBizApi.getTasks();
break;
}
if (result == null) {
return HttpResponse.NOT_FOUND;
} else {
return HttpResponse.json(JsonUtils.toJson(result));
}
}
}
package com.tbyf.dataadapter;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
@Data
public class WorkerProperties {
/**
* unique name in the group
*/
private String name = "worker0";
/**
* the group that this worker belonged to
*/
private String group = "group0";
private String registryAddress = "127.0.0.1:2181";
/**
* the embedded http server's port within this worker
*/
private int port = 8081;
public void validate() {
if (StringUtils.isBlank(name)) {
throw new IllegalStateException("name not set");
}
if (StringUtils.isBlank(group)) {
throw new IllegalStateException("group not set");
}
if (StringUtils.isBlank(registryAddress)) {
throw new IllegalStateException("registryAddress not set");
}
}
}
package com.tbyf.dataadapter.api;
import com.tbyf.dataadapter.api.vo.ProcessorVo;
import com.tbyf.dataadapter.api.vo.Result;
import com.tbyf.dataadapter.task.Task;
import com.tbyf.dataadapter.task.TaskDef;
import java.util.List;
public interface WorkerBizApi {
/**
* install plugin from remote plugin repository
*/
Result<Boolean> installPlugin(String pluginId);
Result<Boolean> submitTask(TaskDef taskDef);
Result<Boolean> startTask(String taskId);
Result<Boolean> stopTask(String taskId);
Result<List<Task>> getTasks();
Result<List<ProcessorVo>> getProcessors();
Result<List<ProcessorVo>> getHeadProcessors();
}
package com.tbyf.dataadapter.api.impl;
import com.google.gson.reflect.TypeToken;
import com.tbyf.dataadapter.api.WorkerBizApi;
import com.tbyf.dataadapter.api.vo.ProcessorVo;
import com.tbyf.dataadapter.api.vo.Result;
import com.tbyf.dataadapter.task.Task;
import com.tbyf.dataadapter.task.TaskDef;
import com.tbyf.dataadapter.task.processor.config.PipelineDef;
import com.tbyf.dataadapter.task.processor.config.ProcessorConfig;
import com.tbyf.dataadapter.util.HttpClient;
import com.tbyf.dataadapter.util.JsonUtils;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
public class WorkerBizClient implements WorkerBizApi {
private final String workerUrl;
public WorkerBizClient(String workerUrl) {
this.workerUrl = workerUrl;
}
@Override
public Result<Boolean> installPlugin(String pluginId) {
try {
String result = HttpClient.post(workerUrl + "/installPlugin", pluginId);
return JsonUtils.fromJson(result, new TypeToken<Result<Boolean>>() {
});
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public Result<Boolean> submitTask(TaskDef taskDef) {
try {
String result = HttpClient.post(workerUrl + "/submitTask", JsonUtils.toJson(taskDef));
return JsonUtils.fromJson(result, new TypeToken<Result<Boolean>>() {
});
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public Result<Boolean> startTask(String taskId) {
try {
String result = HttpClient.post(workerUrl + "/startTask", taskId);
return JsonUtils.fromJson(result, new TypeToken<Result<Boolean>>() {
});
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public Result<Boolean> stopTask(String taskId) {
try {
String result = HttpClient.post(workerUrl + "/stopTask", taskId);
return JsonUtils.fromJson(result, new TypeToken<Result<Boolean>>() {
});
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public Result<List<Task>> getTasks() {
try {
String result = HttpClient.get(workerUrl + "/getTasks");
return JsonUtils.fromJson(result, new TypeToken<Result<List<Task>>>() {
});
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public Result<List<ProcessorVo>> getProcessors() {
try {
String result = HttpClient.get(workerUrl + "/getProcessors");
return JsonUtils.fromJson(result, new TypeToken<Result<List<ProcessorVo>>>() {
});
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public Result<List<ProcessorVo>> getHeadProcessors() {
try {
String result = HttpClient.get(workerUrl + "/getHeadProcessors");
return JsonUtils.fromJson(result, new TypeToken<Result<List<ProcessorVo>>>() {
});
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) {
WorkerBizClient client = new WorkerBizClient("http://localhost:8081");
System.out.println(client.getHeadProcessors());
System.out.println(client.getProcessors());
Result<Boolean> r = client.installPlugin("new-plugin-1.1-SNAPSHOT");
System.out.println(r);
System.out.println(client.getProcessors());
System.out.println(client.getHeadProcessors());
TaskDef taskDef = new TaskDef();
taskDef.setTaskId("02");
taskDef.setParallelism(3);
PipelineDef def = new PipelineDef();
ProcessorConfig headCfg = new ProcessorConfig();
headCfg.setProcessorId("new-plugin-1.1-SNAPSHOT@com.hy.example.HP");
headCfg.setConfigStr("uuidL=17");
def.setHead(headCfg);
ProcessorConfig p1 = new ProcessorConfig();
p1.setProcessorId("new-plugin-1.1-SNAPSHOT@com.hy.example.DP");
p1.setConfigStr("desc=ok:");
ProcessorConfig p2 = new ProcessorConfig();
p2.setProcessorId("new-plugin-1.1-SNAPSHOT@com.hy.example.DP");
p2.setConfigStr("desc=test:");
def.setProcessors(Arrays.asList(p1, p2));
taskDef.setPipelineDef(def);
System.out.println(client.submitTask(taskDef));
System.out.println(client.startTask("02"));
client.stopTask("02");
}
}
package com.tbyf.dataadapter.api.impl;
import com.tbyf.dataadapter.api.WorkerBizApi;
import com.tbyf.dataadapter.api.vo.ProcessorVo;
import com.tbyf.dataadapter.api.vo.Result;
import com.tbyf.dataadapter.task.Task;
import com.tbyf.dataadapter.task.TaskDef;
import com.tbyf.dataadapter.task.TaskManager;
import com.tbyf.dataadapter.task.processor.DataProcessor;
import com.tbyf.dataadapter.task.processor.plugin.DataProcessorExtension;
import com.tbyf.dataadapter.task.processor.plugin.HeadProcessorExtension;
import com.tbyf.dataadapter.task.processor.plugin.ProcessorManager;
import com.tbyf.dataadapter.util.HttpClient;
import lombok.Setter;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class WorkerBizImpl implements WorkerBizApi {
private final TaskManager taskManager;
private final String pluginsRoot;
/**
* the remote plugin repository address
*/
private final String pluginRepository;
public WorkerBizImpl(String pluginsRoot, String pluginRepository) {
this.pluginsRoot = pluginsRoot;
ProcessorManager.setPluginsRoot(pluginsRoot);
this.taskManager = new TaskManager();
this.pluginRepository = pluginRepository;
}
@Override
public Result<Boolean> installPlugin(String pluginId) {
Path dest = Paths.get(pluginsRoot, pluginId + ".jar");
ProcessorManager processorManager = taskManager.getProcessorManager();
String existingPlugin = processorManager.getPluginIdForPath(dest);
if (existingPlugin != null) {
processorManager.deletePlugin(existingPlugin);
}
try {
HttpClient.download(getPluginDownloadUrl(pluginId), dest);
processorManager.installPlugin(dest.toString());
return Result.ok(true);
} catch (IOException e) {
return Result.ok(false);
}
}
private String getPluginDownloadUrl(String pluginId) {
return pluginRepository + "/" + pluginId;
}
@Override
public Result<Boolean> submitTask(TaskDef taskDef) {
try {
taskManager.submitTask(taskDef);
return Result.ok(true);
} catch (Exception e) {
return Result.ok(false);
}
}
@Override
public Result<Boolean> startTask(String taskId) {
try {
taskManager.startTask(taskId);
return Result.ok(true);
} catch (Exception e) {
return Result.ok(false);
}
}
@Override
public Result<Boolean> stopTask(String taskId) {
try {
taskManager.terminateTask(taskId);
return Result.ok(true);
} catch (Exception e) {
return Result.ok(false);
}
}
@Override
public Result<List<Task>> getTasks() {
return Result.ok(taskManager.getTasks());
}
@Override
public Result<List<ProcessorVo>> getProcessors() {
ProcessorManager processorManager = taskManager.getProcessorManager();
Map<String, DataProcessorExtension> processors = processorManager.getProcessors();
List<ProcessorVo> result = new ArrayList<>();
for (String key : processors.keySet()) {
String[] split = key.split("@");
String pluginId = split[0];
String processorClassname = split[1];
String desc = processors.get(key).description();
ProcessorVo vo = new ProcessorVo();
vo.setPluginId(pluginId);
vo.setProcessorClassname(processorClassname);
vo.setDesc(desc);
result.add(vo);
}
return Result.ok(result);
}
@Override
public Result<List<ProcessorVo>> getHeadProcessors() {
ProcessorManager processorManager = taskManager.getProcessorManager();
Map<String, HeadProcessorExtension> processors = processorManager.getHeadProcessors();
List<ProcessorVo> result = new ArrayList<>();
for (String key : processors.keySet()) {
String[] split = key.split("@");
String pluginId = split[0];
String processorClassname = split[1];
String desc = processors.get(key).description();
ProcessorVo vo = new ProcessorVo();
vo.setPluginId(pluginId);
vo.setProcessorClassname(processorClassname);
vo.setDesc(desc);
result.add(vo);
}
return Result.ok(result);
}
}
package com.tbyf.dataadapter.api.vo;
import lombok.Data;
@Data
public class ProcessorVo {
private String pluginId;
private String processorClassname;
private String desc;
}
package com.tbyf.dataadapter.api.vo;
import com.google.gson.Gson;
import lombok.Data;
@Data
public class Result<T> {
private T data;
private String msg;
private int code;
public static <T> Result<T> ok(T data) {
return build(data, "ok", 200);
}
private static <T> Result<T> build(T data, String msg, int code) {
Result<T> r = new Result<>();
r.data = data;
r.msg = msg;
r.code = code;
return r;
}
}
package com.tbyf.dataadapter.console;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class DataAdapterConsoleApplication {
public static void main(String[] args) {
SpringApplication.run(DataAdapterConsoleApplication.class, args);
}
}
package com.tbyf.dataadapter.console.config;
import com.tbyf.dataadapter.registry.WorkerRegistry;
import com.tbyf.dataadapter.registry.support.ZookeeperWorkerRegistry;
import com.tbyf.dataadapter.task.processor.plugin.ProcessorManager;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DataAdapterConsoleConfig {
@Value("${pluginsRepoLocation}")
private String pluginsRoot;
@Value("${zk}")
private String zk;
@Bean
public ProcessorManager processorManager() {
ProcessorManager.setPluginsRoot(pluginsRoot);
ProcessorManager processorManager = new ProcessorManager();
processorManager.start();
return processorManager;
}
@Bean
public WorkerRegistry workerRegistry() {
return new ZookeeperWorkerRegistry(zk);
}
}
package com.tbyf.dataadapter.console.controlle;
import com.tbyf.dataadapter.task.processor.plugin.ProcessorManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
import org.springframework.http.ContentDisposition;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.util.ResourceUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.nio.file.Files;
import java.nio.file.Path;
@RestController
@RequestMapping("/plugin")
public class PluginController {
@Autowired
ResourceLoader resourceLoader;
@Autowired
ProcessorManager processorManager;
@GetMapping("/download/{pluginId}")
public ResponseEntity<?> download(@PathVariable String pluginId) {
Path pluginPath = this.processorManager.getPluginPath(pluginId);
if (pluginPath == null || !Files.exists(pluginPath)) {
return ResponseEntity.notFound().build();
}
Resource resource = this.resourceLoader.getResource(ResourceUtils.FILE_URL_PREFIX + pluginPath);
HttpHeaders headers = new HttpHeaders();
headers.setContentDisposition(
ContentDisposition.attachment()
.filename(pluginPath.getFileName().toString())
.build());
return ResponseEntity.ok()
.headers(headers)
.contentType(MediaType.APPLICATION_OCTET_STREAM)
.body(resource);
}
}
package com.tbyf.dataadapter.console.controlle;
import com.tbyf.dataadapter.api.vo.Result;
import com.tbyf.dataadapter.registry.GroupNode;
import com.tbyf.dataadapter.registry.WorkerRegistry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
@RestController
@RequestMapping("/worker")
public class WorkerController {
@Autowired
WorkerRegistry workerRegistry;
@GetMapping("/groups")
public Result<?> getWorkerGroups() {
List<GroupNode> workerGroups = workerRegistry.getWorkerGroups();
return Result.ok(workerGroups);
}
}
package com.tbyf.dataadapter.http;
/**
* most common content type
*/
public class ContentType {
public static final String TEXT_HTML = "text/html; charset=UTF-8";
public static final String APPLICATION_JSON = "application/json; charset=UTF-8";
}
package com.tbyf.dataadapter.http;
import lombok.Builder;
import lombok.Data;
@Builder
@Data
public class HttpRequest {
private String uri;
private String content;
private String method;
}
package com.tbyf.dataadapter.http;
import lombok.Data;
@Data
public class HttpResponse {
public static String EMPTY_CONTENT = "";
public static HttpResponse EMPTY_RESPONSE = textHtml(EMPTY_CONTENT);
public static HttpResponse NOT_FOUND = textHtml("404");
public static HttpResponse ERROR = textHtml("500");
private String content;
private String contentType;
private static HttpResponse build(String content, String contentType) {
HttpResponse response = new HttpResponse();
response.content = content;
response.contentType = contentType;
return response;
}
public static HttpResponse json(String json) {
return build(json, ContentType.APPLICATION_JSON);
}
public static HttpResponse textHtml(String textHtml) {
return build(textHtml, ContentType.TEXT_HTML);
}
}
package com.tbyf.dataadapter.http;
public interface RequestHandler {
HttpResponse handle(HttpRequest request) throws Exception;
}
package com.tbyf.dataadapter.http;
import com.tbyf.dataadapter.util.Assert;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* a simple http server used by worker
*/
@Slf4j
public class SimpleHttpServer {
private final int port;
private Thread t;
private final RequestHandler requestHandler;
public SimpleHttpServer(int port, RequestHandler requestHandler) {
Assert.notNull(requestHandler, "request handler must not be null");
this.port = port;
this.requestHandler = requestHandler;
}
public void start() {
t = new Thread(() -> {
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new IdleStateHandler(0, 0, 30, TimeUnit.SECONDS))
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // 5M
.addLast(new SimpleHttpRequestHandler());
}
});
Channel ch = null;
try {
ch = b.bind(port).sync().channel();
ch.closeFuture().sync();
} catch (InterruptedException e) {
// manually stop server
log.info("stopping embedded server");
Thread.currentThread().interrupt();
}
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}, "embedded-server-main-thread");
// t.setDaemon(true);
t.start();
}
public void stop() {
t.interrupt();
}
private class SimpleHttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
@Override
protected void channelRead0(ChannelHandlerContext context, FullHttpRequest msg) throws Exception {
HttpRequest request = buildFromFullHttpRequest(msg);
HttpResponse response = null;
try {
response = requestHandler.handle(request);
} catch (Exception e) {
log.error(e.getMessage(), e);
response = HttpResponse.ERROR;
}
context.writeAndFlush(buildFromHttpResponse(response));
}
private FullHttpResponse buildFromHttpResponse(HttpResponse response) {
response = response != null ? response : HttpResponse.NOT_FOUND;
DefaultFullHttpResponse result = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK,
Unpooled.copiedBuffer(response.getContent(), CharsetUtil.UTF_8));
result.headers().set(HttpHeaderNames.CONTENT_TYPE, response.getContentType());
result.headers().set(HttpHeaderNames.CONTENT_LENGTH, result.content().readableBytes());
return result;
}
private HttpRequest buildFromFullHttpRequest(FullHttpRequest request) {
return HttpRequest.builder()
.uri(request.uri())
.content(request.content().toString(CharsetUtil.UTF_8))
.method(request.method().name())
.build();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
ctx.close();
} else {
super.userEventTriggered(ctx, evt);
}
}
}
public static void main(String[] args) {
AtomicBoolean stopServer = new AtomicBoolean();
SimpleHttpServer server = new SimpleHttpServer(8080, request -> {
System.out.println(request);
if ("/stop".equals(request.getUri())) {
stopServer.set(true);
}
if ("/testErr".equals(request.getUri())) {
throw new RuntimeException("for testing");
}
if ("/404".equals(request.getUri())) {
return null;
}
return HttpResponse.textHtml(UUID.randomUUID().toString());
});
server.start();
// todo: whether using loops is not well?
while (!stopServer.get()) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
server.stop();
}
}
package com.tbyf.dataadapter.registry;
import lombok.Data;
import java.util.HashSet;
import java.util.Set;
@Data
public class GroupNode {
private String name;
private Set<WorkerNode> workerNodes = new HashSet<>();
}
package com.tbyf.dataadapter.registry;
import lombok.Data;
@Data
public class WorkerNode {
private String name;
private String address;
}
package com.tbyf.dataadapter.registry;
import com.tbyf.dataadapter.Worker;
import java.util.List;
public interface WorkerRegistry {
void registerWorker(Worker worker);
List<GroupNode> getWorkerGroups();
}
package com.tbyf.dataadapter.registry.support;
import com.tbyf.dataadapter.Worker;
public class ZkPathHelper {
public static final String TOP_ROOT_PATH = "/tbyf";
public static final String WORKER_GROUP_ROOT_PATH = TOP_ROOT_PATH + "/group";
public static final String WORKER_GROUP_LOCK_PATH = TOP_ROOT_PATH + "/groupLock";
public static String getWorkerPath(Worker worker) {
return WORKER_GROUP_ROOT_PATH + "/" + worker.getGroup() + "/" + worker.getName();
}
public static String getGroupPath(String groupName) {
return WORKER_GROUP_ROOT_PATH + "/" + groupName;
}
/**
* lock path for worker doing sth exclusively with other worker in same group
*/
public static String getLockPathForWorkerInGroup(String groupName, String lockName) {
return WORKER_GROUP_LOCK_PATH + "/" + lockName + "/" + groupName;
}
}
package com.tbyf.dataadapter.registry.support;
import com.tbyf.dataadapter.TimeoutException;
import com.tbyf.dataadapter.Worker;
import com.tbyf.dataadapter.registry.GroupNode;
import com.tbyf.dataadapter.registry.WorkerNode;
import com.tbyf.dataadapter.registry.WorkerRegistry;
import com.tbyf.dataadapter.util.ZkUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.ZKPaths;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
public class ZookeeperWorkerRegistry implements WorkerRegistry {
private final CuratorFramework zkClient;
private final Map<String, GroupNode> groupNodes = new ConcurrentHashMap<>();
public ZookeeperWorkerRegistry(String zkUrl) {
CuratorFramework zkClient = CuratorFrameworkFactory.newClient(zkUrl, new ExponentialBackoffRetry(3000, 10));
zkClient.start();
try {
if (!zkClient.blockUntilConnected(30, TimeUnit.SECONDS)) {
throw new TimeoutException("zookeeper connection timeout");
}
} catch (Throwable e) {
throw new RuntimeException(e);
}
this.zkClient = zkClient;
try {
init();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void init() throws Exception {
PathChildrenCache cache = new PathChildrenCache(this.zkClient, ZkPathHelper.WORKER_GROUP_ROOT_PATH, true);
cache.getListenable().addListener((client, event) -> {
ChildData data = event.getData();
if (data == null) {
return;
}
if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
String groupName = ZKPaths.getNodeFromPath(data.getPath());
GroupNode groupNode = new GroupNode();
groupNode.setName(groupName);
groupNodes.put(groupName, groupNode);
onGroupAdded(groupName);
}
});
cache.start();
}
private void onGroupAdded(String groupName) throws Exception {
PathChildrenCache cache = new PathChildrenCache(
this.zkClient, ZkPathHelper.getGroupPath(groupName), true);
cache.getListenable().addListener((client, event) -> {
ChildData data = event.getData();
if (data == null) {
return;
}
String workerName = ZKPaths.getNodeFromPath(data.getPath());
String workerUrl = new String(data.getData(), StandardCharsets.UTF_8);
WorkerNode workerNode = new WorkerNode();
workerNode.setName(workerName);
workerNode.setAddress(workerUrl);
switch (event.getType()) {
case CHILD_ADDED:
groupNodes.get(groupName).getWorkerNodes().add(workerNode);
break;
case CHILD_REMOVED:
groupNodes.get(groupName).getWorkerNodes().remove(workerNode);
break;
}
});
cache.start();
}
@Override
public void registerWorker(Worker worker) {
String workerPath = ZkPathHelper.getWorkerPath(worker);
String lockPath = ZkPathHelper.getLockPathForWorkerInGroup(worker.getGroup(), "registerWorker");
ZkUtils.runWithinDistributedLock(zkClient, lockPath, () -> {
if (!ZkUtils.exists(zkClient, workerPath)) {
ZkUtils.createEphemeralZNode(zkClient, workerPath, worker.getEmbeddedServerAddress());
} else {
throw new IllegalStateException("worker for path [" + workerPath + "] already exists");
}
});
}
@Override
public List<GroupNode> getWorkerGroups() {
return new ArrayList<>(groupNodes.values());
}
}
package com.tbyf.dataadapter.task;
import lombok.Data;
@Data
public class Task {
private String id;
private String description;
private int parallelism;
private TaskState state;
}
package com.tbyf.dataadapter.task;
import com.tbyf.dataadapter.task.processor.config.PipelineDef;
import lombok.Data;
@Data
public class TaskDef {
String taskId;
String description;
int parallelism;
PipelineDef pipelineDef;
}
package com.tbyf.dataadapter.task;
import com.tbyf.dataadapter.task.processor.*;
import com.tbyf.dataadapter.task.processor.config.PipelineDef;
import com.tbyf.dataadapter.task.processor.config.ProcessorConfig;
import com.tbyf.dataadapter.task.processor.plugin.ProcessorManager;
import com.tbyf.dataadapter.util.Utils;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public class TaskManager {
@Getter
private final ProcessorManager processorManager;
private final Map<String, ProcessorPipeline> pipelines = new HashMap<>();
private final Map<String, TaskState> taskStateMap = new HashMap<>();
private final Map<String, List<TaskThread>> taskExecutors = new HashMap<>();
private final Map<String, Task> tasks = new ConcurrentHashMap<>();
public TaskManager() {
processorManager = new ProcessorManager();
processorManager.start();
}
public synchronized void submitTask(TaskDef taskDef) {
String taskId = taskDef.getTaskId();
if (pipelines.containsKey(taskId)) {
throw new IllegalStateException("task for id [" + taskId + "] already exists");
}
ProcessorPipeline pipeline = buildPipeline(taskDef.getPipelineDef());
pipelines.put(taskId, pipeline);
taskStateMap.put(taskId, TaskState.NEW);
Task task = new Task();
task.setId(taskId);
task.setDescription(taskDef.getDescription());
task.setState(TaskState.NEW);
int cpuNums = Utils.getCpuNums();
int parallelism = taskDef.getParallelism();
if (parallelism > cpuNums) {
log.warn("task[" + taskId + "] parallelism set too large, reset it to " + cpuNums);
parallelism = cpuNums;
}
if (parallelism <= 1) {
parallelism = 1;
}
task.setParallelism(parallelism);
tasks.put(taskId, task);
}
private ProcessorPipeline buildPipeline(PipelineDef def) {
ProcessorConfig head = def.getHead();
String headId = head.getProcessorId();
HeadProcessor headProcessor = processorManager.getHeadProcessors().get(headId);
if (headProcessor == null) {
throw new IllegalStateException("head processor for id [" + headId + "] not found");
}
headProcessor.configure(Configuration.from(head.getConfigStr()));
DefaultProcessorPipeline pipeline = new DefaultProcessorPipeline(headProcessor);
for (ProcessorConfig cfg : def.getProcessors()) {
String id = cfg.getProcessorId();
DataProcessor processor = processorManager.getProcessors().get(id);
if (processor == null) {
throw new IllegalStateException("processor for id [" + id + "] not found ");
}
processor.configure(Configuration.from(cfg.getConfigStr()));
pipeline.addLast(processor);
}
return pipeline;
}
public synchronized void startTask(String taskId) {
TaskState taskState = taskStateMap.get(taskId);
if (taskState == TaskState.NEW) {
ProcessorPipeline pipeline = pipelines.get(taskId);
List<TaskThread> ts = new ArrayList<>();
int parallelism = tasks.get(taskId).getParallelism();
for (int i = 0; i < parallelism; i++) {
TaskThread t = new TaskThread(taskId + "-" + i, pipeline::process);
((DefaultProcessorContext) pipeline.context()).addShutdownHook(t::terminate);
t.addShutdownHook(() -> terminateTask(taskId));
t.start();
ts.add(t);
}
taskExecutors.put(taskId, ts);
taskStateMap.put(taskId, TaskState.STARTED);
updateState(taskId, TaskState.STARTED);
}
}
public synchronized void pauseTask(String taskId) {
TaskState taskState = taskStateMap.get(taskId);
if (taskState == TaskState.STARTED) {
taskStateMap.put(taskId, TaskState.PAUSED);
taskExecutors.get(taskId).forEach(TaskThread::pause);
updateState(taskId, TaskState.PAUSED);
}
}
public synchronized void resumeTask(String taskId) {
TaskState taskState = taskStateMap.get(taskId);
if (taskState == TaskState.PAUSED) {
taskStateMap.put(taskId, TaskState.STARTED);
taskExecutors.get(taskId).forEach(TaskThread::proceed);
updateState(taskId, TaskState.STARTED);
}
}
public synchronized void terminateTask(String taskId) {
TaskState taskState = taskStateMap.get(taskId);
if (taskState == TaskState.STARTED || taskState == TaskState.PAUSED) {
taskStateMap.put(taskId, TaskState.TERMINATED);
taskExecutors.get(taskId).forEach(TaskThread::terminate);
updateState(taskId, TaskState.TERMINATED);
}
}
private void updateState(String taskId, TaskState state) {
tasks.get(taskId).setState(state);
}
public List<Task> getTasks() {
return new ArrayList<>(tasks.values());
}
public static void main(String[] args) {
ProcessorManager.setPluginsRoot("plugins001");
TaskManager taskManager = new TaskManager();
TaskDef taskDef = new TaskDef();
taskDef.setParallelism(99);
taskDef.setTaskId("1");
// taskDef.setParallelism(2);
PipelineDef pDef = new PipelineDef();
ProcessorConfig head = new ProcessorConfig();
head.setProcessorId("new-plugin-1.1-SNAPSHOT@com.hy.example.HP");
head.setConfigStr("uuidL=3\na=b");
// new-plugin-1.1-SNAPSHOT@com.hy.example.DP=com.hy.example.DP
ProcessorConfig p1 = new ProcessorConfig();
p1.setProcessorId("new-plugin-1.1-SNAPSHOT@com.hy.example.DP");
p1.setConfigStr("desc=描述");
ProcessorConfig p2 = new ProcessorConfig();
p2.setProcessorId("new-plugin-1.1-SNAPSHOT@com.hy.example.DP2");
pDef.setHead(head);
pDef.setProcessors(Arrays.asList(p1, p2));
taskDef.setPipelineDef(pDef);
taskManager.submitTask(taskDef);
taskManager.startTask("1");
}
}
package com.tbyf.dataadapter.task;
public enum TaskState {
NEW,
STARTED,
PAUSED,
TERMINATED
}
package com.tbyf.dataadapter.task;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
@Slf4j
public class TaskThread extends Thread {
private volatile boolean paused = false;
private volatile boolean terminated = false;
private final Runnable task;
private final List<Runnable> shutdownHooks = new ArrayList<>();
public TaskThread(String taskId, Runnable task) {
setName(taskId);
this.task = task;
}
public void addShutdownHook(Runnable hook) {
shutdownHooks.add(hook);
}
public void pause() {
paused = true;
}
public void proceed() {
paused = false;
}
public void terminate() {
terminated = true;
for (Runnable hook : shutdownHooks) {
hook.run();
}
}
@Override
public void run() {
while (!terminated) {
while (paused && !terminated) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
if (terminated) {
break;
}
try {
task.run();
} catch (Throwable e) {
log.error(e.getMessage(), e);
terminate();
return;
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
package com.tbyf.dataadapter.task.processor;
import com.google.gson.Gson;
import java.io.IOException;
import java.io.StringReader;
import java.util.Properties;
public class Configuration {
private final Properties props;
public Configuration(Properties props) {
this.props = props;
}
public String getString(String key) {
return props.getProperty(key);
}
public Integer getInt(String key) {
String val = props.getProperty(key);
return val == null ? null : Integer.parseInt(val);
}
public <T> T getObject(String key, Class<T> type) {
String val = props.getProperty(key);
if (val == null) {
return null;
}
Gson gson = new Gson();
return gson.fromJson(val, type);
}
public static Configuration from(String str) {
Properties properties = new Properties();
try {
properties.load(new StringReader(str));
} catch (Exception e) {
}
return new Configuration(properties);
}
}
package com.tbyf.dataadapter.task.processor;
/**
* data processor is dynamic added to the worker app
*/
public interface DataProcessor {
void process(ProcessorContext ctx, Object data);
void configure(Configuration config);
}
package com.tbyf.dataadapter.task.processor;
import lombok.Setter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
public class DefaultProcessorContext implements ProcessorContext {
private final ProcessorPipeline pipeline;
private final List<Runnable> shutdownHooks = new ArrayList<>();
private final AtomicBoolean shutdown = new AtomicBoolean();
public void addShutdownHook(Runnable hook) {
shutdownHooks.add(hook);
}
public DefaultProcessorContext(ProcessorPipeline pipeline) {
this.pipeline = pipeline;
}
@Override
public void fireNext(Object data) {
DataProcessor next = pipeline.nextProcessor();
if (next != null) {
next.process(this, data);
}
}
@Override
public void terminate() {
if (shutdown.compareAndSet(false, true)) {
for (Runnable hook : shutdownHooks) {
hook.run();
}
}
}
}
package com.tbyf.dataadapter.task.processor;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
@Slf4j
public class DefaultProcessorPipeline implements ProcessorPipeline {
private final HeadProcessor head;
private final List<DataProcessor> processors = new ArrayList<>();
private volatile DataProcessor[] _processors;
// todo: consider using thread local to store this variable
// private int curPtr = 0;
private static final ThreadLocal<Integer> curPtr = ThreadLocal.withInitial(() -> 0);
private ProcessorContext ctx;
public DefaultProcessorPipeline(HeadProcessor head) {
this.head = head;
}
@Override
public void addLast(DataProcessor processor) {
processors.add(processor);
}
@Override
public ProcessorContext context() {
if (ctx != null) {
return ctx;
}
synchronized (this) {
if (ctx != null) {
return ctx;
}
ctx = new DefaultProcessorContext(this);
return ctx;
}
}
@Override
public HeadProcessor headProcessor() {
return head;
}
@Override
public void process() {
// freeze the processors after first processing
if (this._processors == null) {
synchronized (this) {
if (this._processors == null) {
this._processors = this.processors.toArray(new DataProcessor[0]);
}
}
}
curPtr.set(0);
headProcessor().process(context(), null);
}
@Override
public DataProcessor nextProcessor() {
DataProcessor[] _processors = this._processors;
if (_processors == null || _processors.length == 0) {
return null;
}
Integer cur = curPtr.get();
if (cur >= _processors.length) {
return null;
}
DataProcessor result = _processors[cur];
curPtr.set(cur + 1);
return result;
}
}
package com.tbyf.dataadapter.task.processor;
/**
* data provider, fetch source data from a variety of data storage system,
* such as oracle, elasticsearch etc.
*/
public interface HeadProcessor extends DataProcessor {
@Override
default void process(ProcessorContext ctx, Object data) {
ctx.fireNext(fetch(ctx));
}
/**
* the implementation should be thread safe
*/
Object fetch(ProcessorContext ctx);
}
package com.tbyf.dataadapter.task.processor;
public interface ProcessorContext {
/**
* trigger next processor to processor the given data
*/
void fireNext(Object data);
void terminate();
}
package com.tbyf.dataadapter.task.processor;
public interface ProcessorPipeline {
void addLast(DataProcessor processor);
ProcessorContext context();
HeadProcessor headProcessor();
void process();
DataProcessor nextProcessor();
}
package com.tbyf.dataadapter.task.processor.config;
import lombok.Data;
import java.util.List;
@Data
public class PipelineDef {
private ProcessorConfig head;
/**
* other processors excepts the head processor
*/
private List<ProcessorConfig> processors;
}
package com.tbyf.dataadapter.task.processor.config;
import lombok.Data;
@Data
public class ProcessorConfig {
/**
* pluginId@className
*/
private String processorId;
private String configStr;
}
package com.tbyf.dataadapter.task.processor.plugin;
import com.tbyf.dataadapter.task.processor.Configuration;
import com.tbyf.dataadapter.task.processor.DataProcessor;
import org.pf4j.Extension;
import org.pf4j.ExtensionPoint;
@Extension
public abstract class DataProcessorExtension implements DataProcessor, ExtensionPoint {
protected ClassLoader pluginClassLoader;
/**
* the plugin classloader that loaded the plugin where this processor in
*/
public ClassLoader getPluginClassLoader() {
return pluginClassLoader;
}
public void setPluginClassLoader(ClassLoader cld) {
pluginClassLoader = cld;
}
public String description() {
return getClass().getName();
}
@Override
public final void configure(Configuration config) {
ClassLoader cur = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getPluginClassLoader());
doConfigure(config);
} finally {
Thread.currentThread().setContextClassLoader(cur);
}
}
protected abstract void doConfigure(Configuration config);
}
package com.tbyf.dataadapter.task.processor.plugin;
import com.tbyf.dataadapter.task.processor.HeadProcessor;
import org.pf4j.Extension;
import org.pf4j.ExtensionPoint;
@Extension
public abstract class HeadProcessorExtension extends DataProcessorExtension implements HeadProcessor, ExtensionPoint {
}
package com.tbyf.dataadapter.task.processor.plugin;
import com.tbyf.dataadapter.task.processor.DataProcessor;
import com.tbyf.dataadapter.task.processor.HeadProcessor;
import org.pf4j.DefaultPluginManager;
import org.pf4j.PluginWrapper;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
public class ProcessorManager {
private DefaultPluginManager pluginManager;
/**
* key: pluginId@className
*/
private final Map<String, DataProcessorExtension> processors = new ConcurrentHashMap<>();
private final Map<String, HeadProcessorExtension> headProcessors = new ConcurrentHashMap<>();
public static void setPluginsRoot(String pluginsRoot) {
System.setProperty("pf4j.pluginsDir", pluginsRoot);
}
public void start() {
pluginManager = new DefaultPluginManager();
pluginManager.loadPlugins();
pluginManager.startPlugins();
loadProcessors();
}
private void loadProcessors() {
for (PluginWrapper plugin : pluginManager.getPlugins()) {
String pluginId = plugin.getPluginId();
pluginManager.getExtensions(HeadProcessorExtension.class, pluginId)
.forEach(hp -> {
hp.setPluginClassLoader(plugin.getPluginClassLoader());
headProcessors.put(pluginId + "@" + hp.getClass().getName(), hp);
});
pluginManager.getExtensions(DataProcessorExtension.class, pluginId)
.stream().filter(hp -> !headProcessors.containsKey(pluginId + "@" + hp.getClass().getName()))
.forEach(hp -> {
hp.setPluginClassLoader(plugin.getPluginClassLoader());
processors.put(pluginId + "@" + hp.getClass().getName(), hp);
});
}
}
public synchronized void installPlugin(String pluginPath) {
String pluginId = pluginManager.loadPlugin(Paths.get(pluginPath));
pluginManager.startPlugin(pluginId);
ClassLoader cld = pluginManager.getPluginClassLoader(pluginId);
pluginManager.getExtensions(HeadProcessorExtension.class, pluginId)
.forEach(hp -> {
hp.setPluginClassLoader(cld);
headProcessors.put(pluginId + "@" + hp.getClass().getName(), hp);
});
pluginManager.getExtensions(DataProcessorExtension.class, pluginId)
.stream().filter(hp -> !headProcessors.containsKey(pluginId + "@" + hp.getClass().getName()))
.forEach(hp -> {
hp.setPluginClassLoader(cld);
processors.put(pluginId + "@" + hp.getClass().getName(), hp);
});
}
public synchronized void deletePlugin(String pluginId) {
pluginManager.deletePlugin(pluginId);
headProcessors.keySet().stream()
.filter(key -> key.startsWith(pluginId + "@")).collect(Collectors.toList())
.forEach(headProcessors::remove);
processors.keySet().stream()
.filter(key -> key.startsWith(pluginId + "@")).collect(Collectors.toList())
.forEach(processors::remove);
}
public synchronized String getPluginIdForPath(Path pluginPath) {
for (PluginWrapper plugin : this.pluginManager.getPlugins()) {
if (plugin.getPluginPath().equals(pluginPath)) {
return plugin.getPluginId();
}
}
return null;
}
public Map<String, DataProcessorExtension> getProcessors() {
return new HashMap<>(processors);
}
public Map<String, HeadProcessorExtension> getHeadProcessors() {
return new HashMap<>(headProcessors);
}
public Path getPluginPath(String pluginId) {
return pluginManager.getPlugin(pluginId).getPluginPath();
}
}
package com.tbyf.dataadapter.util;
public class Assert {
public static void notNull(Object obj, String msg) {
if (obj == null) {
throw new IllegalArgumentException(msg);
}
}
}
package com.tbyf.dataadapter.util;
import okhttp3.*;
import okhttp3.logging.HttpLoggingInterceptor;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
public abstract class HttpClient {
private static final OkHttpClient client;
public static MediaType jsonMediaType = MediaType.parse("application/json; charset=utf-8");
public static MediaType textMediaType = MediaType.parse("application/text; charset=utf-8");
static {
HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor();
loggingInterceptor.level(HttpLoggingInterceptor.Level.BASIC);
client = new OkHttpClient.Builder()
.addInterceptor(loggingInterceptor)
.build();
}
public static void download(String url, Path dest) throws IOException {
Request request = new Request.Builder()
.url(url)
.build();
try (Response response = client.newCall(request).execute()) {
if (response.body() == null || response.body().contentLength() == 0) {
throw new IllegalStateException("download failed");
}
Files.copy(response.body().byteStream(), dest);
}
}
public static String post(String url, String reqBody) throws IOException {
url = addHttpSchemaIfNeeded(url);
Request request = new Request.Builder()
.url(url)
.post(RequestBody.create(reqBody, textMediaType))
.build();
try (Response response = client.newCall(request).execute()) {
if (response.body() != null) {
return response.body().string();
} else {
return null;
}
}
}
public static String get(String url) throws IOException {
url = addHttpSchemaIfNeeded(url);
Request request = new Request.Builder()
.url(url)
.get()
.build();
try (Response response = client.newCall(request).execute()) {
if (response.body() != null) {
return response.body().string();
} else {
return null;
}
}
}
private static String addHttpSchemaIfNeeded(String url) {
if (!url.startsWith("http://") && !url.startsWith("https://")) {
return "http://" + url;
} else {
return url;
}
}
}
package com.tbyf.dataadapter.util;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.util.Map;
import java.util.Properties;
public class JsonUtils {
public static <T> T fromJson(String json, Class<T> type) {
Gson gson = new Gson();
return gson.fromJson(json, type);
}
public static <T> T fromJson(String json, TypeToken<T> typeToken) {
Gson gson = new Gson();
return gson.fromJson(json, typeToken);
}
public static String toJson(Object obj) {
Gson gson = new Gson();
return gson.toJson(obj);
}
}
package com.tbyf.dataadapter.util;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
public class NetUtils {
public static String localhost;
public static String getLocalHost() throws UnknownHostException {
if (localhost != null) {
return localhost;
}
synchronized (NetUtils.class) {
if (localhost != null) {
return localhost;
}
localhost = new String(InetAddress.getLocalHost().getHostAddress().getBytes(), StandardCharsets.UTF_8);
return localhost;
}
}
}
package com.tbyf.dataadapter.util;
public class Utils {
public static int getCpuNums() {
return Runtime.getRuntime().availableProcessors();
}
}
package com.tbyf.dataadapter.util;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import java.nio.charset.StandardCharsets;
/**
* a helper class that makes it easy to interact with zookeeper
*/
public class ZkUtils {
/**
* create a ephemeral ZNode
*
* @param zkClient
* @param path
* @param data
*/
public static void createEphemeralZNode(CuratorFramework zkClient, String path, String data) {
createZNode(zkClient, path, true, data);
}
public static void createZNode(CuratorFramework zkClient, String path, boolean deleteOnDisconnect, String data) {
CreateMode createMode = deleteOnDisconnect ? CreateMode.EPHEMERAL : CreateMode.PERSISTENT;
try {
zkClient.create()
.creatingParentsIfNeeded()
.withMode(createMode)
.forPath(path, data.getBytes(StandardCharsets.UTF_8));
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
public static void runWithinDistributedLock(CuratorFramework zkClient, String lockPath, Runnable task) {
InterProcessMutex lock = new InterProcessMutex(zkClient, lockPath);
try {
lock.acquire();
task.run();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
try {
lock.release();
} catch (Exception e) {
// todo: how to deal with lock released failed
}
}
}
public static boolean exists(CuratorFramework zkClient, String path) {
Stat stat = null;
try {
stat = zkClient.checkExists().forPath(path);
} catch (Exception e) {
throw new IllegalStateException(e);
}
return stat != null;
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.tbyf.dataadapter</groupId>
<artifactId>dataadapter</artifactId>
<version>3.0-SNAPSHOT</version>
<packaging>pom</packaging>
<modules>
<module>dataadapter-core</module>
</modules>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.7.14</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.10.1</version>
</dependency>
<dependency>
<groupId>org.pf4j</groupId>
<artifactId>pf4j</artifactId>
<version>3.10.0</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>logging-interceptor</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment