diff --git a/jcommon/es/src/main/java/com/xiaomi/mone/es/EsClient.java b/jcommon/es/src/main/java/com/xiaomi/mone/es/EsClient.java index 9aad4501d..f557c4d83 100644 --- a/jcommon/es/src/main/java/com/xiaomi/mone/es/EsClient.java +++ b/jcommon/es/src/main/java/com/xiaomi/mone/es/EsClient.java @@ -4,7 +4,6 @@ import org.apache.http.Header; import org.apache.http.HttpHost; import org.apache.http.client.config.RequestConfig; -import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.apache.http.impl.nio.reactor.IOReactorConfig; import org.apache.http.message.BasicHeader; import org.elasticsearch.action.ActionListener; @@ -26,12 +25,14 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; -import org.elasticsearch.client.*; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.core.CountRequest; import org.elasticsearch.client.core.CountResponse; import org.elasticsearch.client.indices.*; import org.elasticsearch.client.sniff.ElasticsearchNodesSniffer; -import org.elasticsearch.client.sniff.NodesSniffer; import org.elasticsearch.client.sniff.SniffOnFailureListener; import org.elasticsearch.client.sniff.Sniffer; import org.elasticsearch.common.unit.TimeValue; @@ -56,182 +57,151 @@ */ public class EsClient { - private static RestClientBuilder restClientBuilder; private static Sniffer sniffer; - private static final int TIME_OUT = 10 * 60 * 1000; private static final int SNIFF_INTERVAL_MILLIS = 30 * 1000; private static final int SNIFF_AFTER_FAILURE_DELAY_MILLIS = 30 * 1000; + private static final int MAX_CONN_PER_ROUTE = 500; + private static final int MAX_CONN_TOTAL = 500; + private static final int SOCKET_TIMEOUT_MS = 10 * 60 * 1000; + private static final int CONNECTION_REQUEST_TIMEOUT_MS = 5000 * 1000; + private static final int CONNECT_TIMEOUT_MS = 5000 * 1000; + private static final long KEEP_ALIVE_DURATION_MS = TimeUnit.MINUTES.toMillis(2); private RestHighLevelClient client; private RestClient restClient; -// public EsClient(String esAddr, String user, String pwd) { -// final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); -// credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, pwd)); -// RestClientBuilder builder = RestClient.builder(new HttpHost(esAddr.split(":")[0], Integer.valueOf(esAddr.split(":")[1]), "http")) -// .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { -// @Override -// public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) { -// return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); -// } -// }).setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() { -// // 该方法接收一个RequestConfig.Builder对象,对该对象进行修改后然后返回。 -// @Override -// public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) { -// return builder.setConnectTimeout(5000 * 1000) // 连接超时(默认为1秒) -// .setSocketTimeout(6000 * 1000);// 套接字超时(默认为30秒)//更改客户端的超时限制默认30秒现在改为100*1000分钟 -// } -// });// 调整最大重试超时时间(默认为30秒).setMaxRetryTimeoutMillis(60000) -// this.client = new RestHighLevelClient(builder); -// -// } + public static boolean startedSniffer = true; + + private SniffOnFailureListener sniffOnFailureListener = new SniffOnFailureListener(); public EsClient(String esAddr, String token, String catalog, String database) { - Header[] defaultHeaders = new Header[]{ + validateParams(esAddr, token, catalog, database); + + Header[] defaultHeaders = createDefaultHeaders(token, catalog, database); + + RestClientBuilder builder = createRestClientBuilder(esAddr, defaultHeaders); + + initializeHighLevelClient(builder); + + initializeSnifferIfNeeded(); + } + + private void validateParams(String esAddr, String token, String catalog, String database) { + if (esAddr == null || esAddr.isEmpty() || token == null || token.isEmpty() || catalog == null || catalog.isEmpty() || database == null || database.isEmpty()) { + throw new IllegalArgumentException("Invalid parameters provided"); + } + } + + private Header[] createDefaultHeaders(String token, String catalog, String database) { + return new Header[]{ new BasicHeader("Authorization", token), new BasicHeader("catalog", catalog), new BasicHeader("database", database) }; + } + + private void initializeSnifferIfNeeded() { + if (startedSniffer) { + initializeSniffer(); + } + } + + private RestClientBuilder createRestClientBuilder(String esAddr, Header[] defaultHeaders) { + String[] esAddrParts = esAddr.split(":"); + if (esAddrParts.length != 2) { + throw new IllegalArgumentException("Invalid Elasticsearch address"); + } + + String host = esAddrParts[0]; + int port = Integer.parseInt(esAddrParts[1]); - RestClientBuilder builder = RestClient.builder(new HttpHost(esAddr.split(":")[0], Integer.parseInt(esAddr.split(":")[1]), "http")) + return RestClient.builder(new HttpHost(host, port, "http")) .setDefaultHeaders(defaultHeaders) - .setHttpClientConfigCallback(x -> x.setMaxConnPerRoute(500) - .setMaxConnTotal(500) - .setDefaultRequestConfig(RequestConfig.custom().setSocketTimeout(10 * 60 * 1000) - .setConnectionRequestTimeout(5000 * 1000) - .setConnectTimeout(5000 * 1000) + .setFailureListener(sniffOnFailureListener) + .setHttpClientConfigCallback(x -> x.setMaxConnPerRoute(MAX_CONN_PER_ROUTE) + .setMaxConnTotal(MAX_CONN_TOTAL) + .setDefaultRequestConfig(RequestConfig.custom() + .setSocketTimeout(SOCKET_TIMEOUT_MS) + .setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MS) + .setConnectTimeout(CONNECT_TIMEOUT_MS) .build()) - .setKeepAliveStrategy((response, context) -> TimeUnit.MINUTES.toMillis(2)) + .setKeepAliveStrategy((response, context) -> KEEP_ALIVE_DURATION_MS) .setDefaultIOReactorConfig(IOReactorConfig.custom().setSoKeepAlive(true).build())); - this.client = new RestHighLevelClient(builder); } + public EsClient(String esAddr, String user, String pwd) { + validateParams(esAddr, user, pwd); + + List hosts = createHttpHosts(esAddr); + + Header[] headers = createHeaders(user, pwd); + RestClientBuilder clientBuilder = createRestClientBuilder(hosts, headers); + + initializeHighLevelClient(clientBuilder); + + initializeSnifferIfNeeded(); + } + + private void validateParams(String esAddr, String user, String pwd) { + if (esAddr == null || esAddr.isEmpty() || user == null || user.isEmpty() || pwd == null || pwd.isEmpty()) { + throw new IllegalArgumentException("Invalid parameters provided"); + } + } + + private List createHttpHosts(String esAddr) { String[] addrs = esAddr.split(","); List hosts = new ArrayList<>(); for (String addr : addrs) { String[] hostAndPort = addr.split(":"); + String host = hostAndPort[0]; int port = Integer.parseInt(hostAndPort[1]); - HttpHost host = new HttpHost(hostAndPort[0], port); - hosts.add(host); + hosts.add(new HttpHost(host, port)); } + return hosts; + } + private Header[] createHeaders(String user, String pwd) { String urlEncodePassword = new String(Base64.getUrlEncoder().encode(String.format("%s:%s", user, pwd).getBytes())); String basicAuth = String.format("Basic %s", urlEncodePassword); - Header[] headers = new Header[]{new BasicHeader("Authorization", basicAuth), new BasicHeader("Content-Type", "application/json")}; + return new Header[]{new BasicHeader("Authorization", basicAuth), new BasicHeader("Content-Type", "application/json")}; + } - RestClientBuilder clientBuilder = RestClient.builder(hosts.toArray(new HttpHost[0])) + private RestClientBuilder createRestClientBuilder(List hosts, Header[] headers) { + return RestClient.builder(hosts.toArray(new HttpHost[0])) .setDefaultHeaders(headers) - .setHttpClientConfigCallback(x -> x.setMaxConnPerRoute(500) - .setMaxConnTotal(500) - .setDefaultRequestConfig(RequestConfig.custom().setSocketTimeout(10 * 60 * 1000) - .setConnectionRequestTimeout(5000 * 1000) - .setConnectTimeout(5000 * 1000).build()) - .setKeepAliveStrategy((response, context) -> TimeUnit.MINUTES.toMillis(2)) + .setFailureListener(sniffOnFailureListener) + .setFailureListener(sniffOnFailureListener) + .setHttpClientConfigCallback(x -> x.setMaxConnPerRoute(MAX_CONN_PER_ROUTE) + .setMaxConnTotal(MAX_CONN_TOTAL) + .setDefaultRequestConfig(RequestConfig.custom() + .setSocketTimeout(SOCKET_TIMEOUT_MS) + .setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MS) + .setConnectTimeout(CONNECT_TIMEOUT_MS) + .build()) + .setKeepAliveStrategy((response, context) -> KEEP_ALIVE_DURATION_MS) .setDefaultIOReactorConfig(IOReactorConfig.custom().setSoKeepAlive(true).build())); - this.client = new RestHighLevelClient(clientBuilder); - } - public EsClient(List restAddress, int httpPort, String username, String password, int timeOut, int snifferIntervalMillis, int snifferAfterFailDelayMillis) throws IOException { - snifferNodeInit(restAddress, httpPort, username, password, timeOut, snifferIntervalMillis, snifferAfterFailDelayMillis); - } - - public EsClient(List restAddress, int httpPort, String username, String password) throws IOException { - snifferNodeInit(restAddress, httpPort, username, password, TIME_OUT, SNIFF_INTERVAL_MILLIS, SNIFF_AFTER_FAILURE_DELAY_MILLIS); + private void initializeHighLevelClient(RestClientBuilder clientBuilder) { + this.client = new RestHighLevelClient(clientBuilder); + this.restClient = client.getLowLevelClient(); } - private void snifferNodeInit(List restAddress, int httpPort, String username, String password, int timeOut, int snifferIntervalMillis, int snifferAfterFailDelayMillis) throws IOException { - - - HttpHost[] hosts = new HttpHost[restAddress.size()]; - for (int index = 0; index < restAddress.size(); index++) { - hosts[index] = new HttpHost(restAddress.get(index), httpPort, "http"); - } - - RestClientBuilder.RequestConfigCallback requestConfigCallback = new RestClientBuilder.RequestConfigCallback() { - @Override - public RequestConfig.Builder customizeRequestConfig( - RequestConfig.Builder requestConfigBuilder) { - return requestConfigBuilder - .setConnectTimeout(timeOut) - .setSocketTimeout(timeOut); - } - }; - - RestClientBuilder.HttpClientConfigCallback httpClientConfigCallback = new RestClientBuilder.HttpClientConfigCallback() { - @Override - public HttpAsyncClientBuilder customizeHttpClient( - HttpAsyncClientBuilder httpClientBuilder) { - RequestConfig.Builder requestConfigBuilder = RequestConfig.custom() - .setConnectTimeout(timeOut) - .setSocketTimeout(timeOut) - .setConnectionRequestTimeout(timeOut); - httpClientBuilder.setDefaultRequestConfig(requestConfigBuilder.build()); - return httpClientBuilder; - } - }; - - SniffOnFailureListener sniffOnFailureListener = new SniffOnFailureListener(); - if (username != null && password != null) { - String token = "Basic " + new String(Base64.getUrlEncoder().encode((username + ":" + password).getBytes())); - Header[] tokenHeader = new Header[]{new BasicHeader("Authorization", token)}; - restClientBuilder = RestClient.builder(hosts).setNodeSelector(SKIP_DEDICATED_NODES) - .setFailureListener(sniffOnFailureListener) - .setHttpClientConfigCallback(httpClientConfigCallback) - .setRequestConfigCallback(requestConfigCallback) - .setDefaultHeaders(tokenHeader); - } else { - restClientBuilder = RestClient.builder(hosts).setNodeSelector(SKIP_DEDICATED_NODES) - .setFailureListener(sniffOnFailureListener) - .setRequestConfigCallback(requestConfigCallback) - .setHttpClientConfigCallback(httpClientConfigCallback); - } - - client = new RestHighLevelClient(restClientBuilder); - restClient = client.getLowLevelClient(); - - NodesSniffer elasticsearchNodesSniffer = new ElasticsearchNodesSniffer( - restClient, - TimeUnit.SECONDS.toMillis(5), - ElasticsearchNodesSniffer.Scheme.HTTP); - - // important + private void initializeSniffer() { sniffer = Sniffer.builder(restClient) - .setSniffIntervalMillis(snifferIntervalMillis) - .setSniffAfterFailureDelayMillis(snifferAfterFailDelayMillis) - .setNodesSniffer(elasticsearchNodesSniffer) + .setSniffIntervalMillis(SNIFF_INTERVAL_MILLIS) + .setSniffAfterFailureDelayMillis(SNIFF_AFTER_FAILURE_DELAY_MILLIS) + .setNodesSniffer(new ElasticsearchNodesSniffer( + restClient, + TimeUnit.SECONDS.toMillis(5), + ElasticsearchNodesSniffer.Scheme.HTTP)) .build(); sniffOnFailureListener.setSniffer(sniffer); } - // important - private NodeSelector SKIP_DEDICATED_NODES = new NodeSelector() { - @Override - public void select(Iterable nodes) { - for (Iterator itr = nodes.iterator(); itr.hasNext(); ) { - Node node = itr.next(); - if (node.getRoles() == null) continue; - if ((node.getRoles().isMasterEligible() - && false == node.getRoles().isData() - && false == node.getRoles().isIngest()) - || - (node.getAttributes().containsKey("node_type") - && node.getAttributes().get("node_type").contains("client") - && false == node.getRoles().isData())) { - itr.remove(); - } - } - } - - @Override - public String toString() { - return "SKIP_DEDICATED_NODES"; - } - }; - public SearchResponse search(SearchRequest searchRequest) throws IOException { SearchResponse res = this.client.search(searchRequest, RequestOptions.DEFAULT); return res; @@ -407,7 +377,7 @@ public EsRet dateHistogram(String indexName, String field, String interval, long .extendedBounds(new LongBounds(startTime, endTime));//统计范围 SearchRequest searchRequest = new SearchRequest(); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(builder).aggregation(aggregationBuilder).size(0).timeout(TimeValue.timeValueSeconds(10l)); + searchSourceBuilder.query(builder).aggregation(aggregationBuilder).size(0).timeout(TimeValue.timeValueSeconds(10L)); searchRequest.source(searchSourceBuilder); searchRequest.indices(indexName); SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFileMonitor.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFileMonitor.java index aa3a841b2..f7add647a 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFileMonitor.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFileMonitor.java @@ -84,48 +84,57 @@ public void reg(String path, Predicate predicate) throws IOException, In WatchService watchService = FileSystems.getDefault().newWatchService(); directory.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_CREATE); while (true) { - WatchKey key = watchService.take(); - for (WatchEvent event : key.pollEvents()) { - Path modifiedFile = (Path) event.context(); - String filePath = String.format("%s%s", path, modifiedFile.getFileName().toString()); - if (!predicate.test(filePath) || modifiedFile.getFileName().toString().startsWith(".")) { - continue; - } - HeraFile hfile = fileMap.get(filePath); - - if (event.kind() == StandardWatchEventKinds.ENTRY_MODIFY) { - if (null == hfile) { - hfile = initFile(new File(filePath)); - } - modify(hfile); - } - - if (event.kind() == StandardWatchEventKinds.ENTRY_DELETE) { - fileMap.remove(filePath); - if (null != hfile) { - map.remove(hfile.getFileKey()); - listener.onEvent(FileEvent.builder().type(EventType.delete).fileName(filePath).fileKey(hfile.getFileKey()).build()); - } - } - - if (event.kind() == StandardWatchEventKinds.ENTRY_CREATE) { - File file = new File(filePath); - Object k = FileUtils.fileKey(file); - - if (map.containsKey(k)) { - log.info("change name " + map.get(k) + "--->" + file); - listener.onEvent(FileEvent.builder().fileKey(k).type(EventType.rename).build()); - } else { - log.info("ENTRY_CREATE filePath:{},fileKey:{}", filePath, k); - HeraFile hf = HeraFile.builder().file(file).fileKey(k).fileName(filePath).build(); - map.putIfAbsent(k, hf); - fileMap.put(filePath, hf); - - listener.onEvent(FileEvent.builder().type(EventType.create).fileName(file.getPath()).build()); + try { + WatchKey key = watchService.take(); + try { + for (WatchEvent event : key.pollEvents()) { + Path modifiedFile = (Path) event.context(); + String filePath = String.format("%s%s", path, modifiedFile.getFileName().toString()); + if (!predicate.test(filePath) || modifiedFile.getFileName().toString().startsWith(".")) { + continue; + } + HeraFile hfile = fileMap.get(filePath); + + if (event.kind() == StandardWatchEventKinds.ENTRY_MODIFY) { + if (null == hfile) { + hfile = initFile(new File(filePath)); + } + modify(hfile); + } + + if (event.kind() == StandardWatchEventKinds.ENTRY_DELETE) { + fileMap.remove(filePath); + if (null != hfile) { + map.remove(hfile.getFileKey()); + listener.onEvent(FileEvent.builder().type(EventType.delete).fileName(filePath).fileKey(hfile.getFileKey()).build()); + } + } + + if (event.kind() == StandardWatchEventKinds.ENTRY_CREATE) { + File file = new File(filePath); + Object k = FileUtils.fileKey(file); + + if (map.containsKey(k)) { + log.info("change name " + map.get(k) + "--->" + file); + listener.onEvent(FileEvent.builder().fileKey(k).type(EventType.rename).build()); + } else { + log.info("ENTRY_CREATE filePath:{},fileKey:{}", filePath, k); + HeraFile hf = HeraFile.builder().file(file).fileKey(k).fileName(filePath).build(); + map.putIfAbsent(k, hf); + fileMap.put(filePath, hf); + + listener.onEvent(FileEvent.builder().type(EventType.create).fileName(file.getPath()).build()); + } + } } + } catch (Exception e1) { + log.error("watchService poll events error", e1); + } finally { + key.reset(); } + } catch (Exception e) { + log.error("watchService error", e); } - key.reset(); } } @@ -165,11 +174,13 @@ private HeraFile initFile(File it) { private void modify(HeraFile hfile) { - hfile.getUtime().set(System.currentTimeMillis()); - if (hfile.getFile().length() == 0) { - listener.onEvent(FileEvent.builder().type(EventType.empty).fileName(hfile.getFileName()).fileKey(hfile.getFileKey()).build()); - } else { - listener.onEvent(FileEvent.builder().type(EventType.modify).build()); + if (null != hfile) { + hfile.getUtime().set(System.currentTimeMillis()); + if (hfile.getFile().length() == 0) { + listener.onEvent(FileEvent.builder().type(EventType.empty).fileName(hfile.getFileName()).fileKey(hfile.getFileKey()).build()); + } else { + listener.onEvent(FileEvent.builder().type(EventType.modify).build()); + } } } diff --git a/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java b/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java index b0882d720..363f59624 100644 --- a/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java +++ b/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java @@ -87,7 +87,7 @@ public void testLogFileMonitor() { @Test public void testLogWS() throws IOException { - LogFileWS log = new LogFileWS("D:\\t", new ReadListener() { + LogFile log = new LogFile("D:\\test.log", new ReadListener() { @Override public void onEvent(ReadEvent event) { System.out.println(event.getReadResult().getLines()); @@ -132,4 +132,5 @@ public void testReadFileCutting() throws IOException { log.readLine(); System.in.read(); } + }