Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Log Collection and Monitoring bug #825

Merged
merged 4 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 107 additions & 137 deletions jcommon/es/src/main/java/com/xiaomi/mone/es/EsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.action.ActionListener;
Expand All @@ -26,12 +25,14 @@
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.*;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.client.core.CountResponse;
import org.elasticsearch.client.indices.*;
import org.elasticsearch.client.sniff.ElasticsearchNodesSniffer;
import org.elasticsearch.client.sniff.NodesSniffer;
import org.elasticsearch.client.sniff.SniffOnFailureListener;
import org.elasticsearch.client.sniff.Sniffer;
import org.elasticsearch.common.unit.TimeValue;
Expand All @@ -56,182 +57,151 @@
*/
public class EsClient {

private static RestClientBuilder restClientBuilder;
private static Sniffer sniffer;
private static final int TIME_OUT = 10 * 60 * 1000;
private static final int SNIFF_INTERVAL_MILLIS = 30 * 1000;
private static final int SNIFF_AFTER_FAILURE_DELAY_MILLIS = 30 * 1000;
private static final int MAX_CONN_PER_ROUTE = 500;
private static final int MAX_CONN_TOTAL = 500;
private static final int SOCKET_TIMEOUT_MS = 10 * 60 * 1000;
private static final int CONNECTION_REQUEST_TIMEOUT_MS = 5000 * 1000;
private static final int CONNECT_TIMEOUT_MS = 5000 * 1000;
private static final long KEEP_ALIVE_DURATION_MS = TimeUnit.MINUTES.toMillis(2);

private RestHighLevelClient client;

private RestClient restClient;

// public EsClient(String esAddr, String user, String pwd) {
// final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
// credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, pwd));
// RestClientBuilder builder = RestClient.builder(new HttpHost(esAddr.split(":")[0], Integer.valueOf(esAddr.split(":")[1]), "http"))
// .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
// @Override
// public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
// return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
// }
// }).setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
// // 该方法接收一个RequestConfig.Builder对象,对该对象进行修改后然后返回。
// @Override
// public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {
// return builder.setConnectTimeout(5000 * 1000) // 连接超时(默认为1秒)
// .setSocketTimeout(6000 * 1000);// 套接字超时(默认为30秒)//更改客户端的超时限制默认30秒现在改为100*1000分钟
// }
// });// 调整最大重试超时时间(默认为30秒).setMaxRetryTimeoutMillis(60000)
// this.client = new RestHighLevelClient(builder);
//
// }
public static boolean startedSniffer = true;

private SniffOnFailureListener sniffOnFailureListener = new SniffOnFailureListener();

public EsClient(String esAddr, String token, String catalog, String database) {
Header[] defaultHeaders = new Header[]{
validateParams(esAddr, token, catalog, database);

Header[] defaultHeaders = createDefaultHeaders(token, catalog, database);

RestClientBuilder builder = createRestClientBuilder(esAddr, defaultHeaders);

initializeHighLevelClient(builder);

initializeSnifferIfNeeded();
}

private void validateParams(String esAddr, String token, String catalog, String database) {
if (esAddr == null || esAddr.isEmpty() || token == null || token.isEmpty() || catalog == null || catalog.isEmpty() || database == null || database.isEmpty()) {
throw new IllegalArgumentException("Invalid parameters provided");
}
}

private Header[] createDefaultHeaders(String token, String catalog, String database) {
return new Header[]{
new BasicHeader("Authorization", token),
new BasicHeader("catalog", catalog),
new BasicHeader("database", database)
};
}

private void initializeSnifferIfNeeded() {
if (startedSniffer) {
initializeSniffer();
}
}

private RestClientBuilder createRestClientBuilder(String esAddr, Header[] defaultHeaders) {
String[] esAddrParts = esAddr.split(":");
if (esAddrParts.length != 2) {
throw new IllegalArgumentException("Invalid Elasticsearch address");
}

String host = esAddrParts[0];
int port = Integer.parseInt(esAddrParts[1]);

RestClientBuilder builder = RestClient.builder(new HttpHost(esAddr.split(":")[0], Integer.parseInt(esAddr.split(":")[1]), "http"))
return RestClient.builder(new HttpHost(host, port, "http"))
.setDefaultHeaders(defaultHeaders)
.setHttpClientConfigCallback(x -> x.setMaxConnPerRoute(500)
.setMaxConnTotal(500)
.setDefaultRequestConfig(RequestConfig.custom().setSocketTimeout(10 * 60 * 1000)
.setConnectionRequestTimeout(5000 * 1000)
.setConnectTimeout(5000 * 1000)
.setFailureListener(sniffOnFailureListener)
.setHttpClientConfigCallback(x -> x.setMaxConnPerRoute(MAX_CONN_PER_ROUTE)
.setMaxConnTotal(MAX_CONN_TOTAL)
.setDefaultRequestConfig(RequestConfig.custom()
.setSocketTimeout(SOCKET_TIMEOUT_MS)
.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MS)
.setConnectTimeout(CONNECT_TIMEOUT_MS)
.build())
.setKeepAliveStrategy((response, context) -> TimeUnit.MINUTES.toMillis(2))
.setKeepAliveStrategy((response, context) -> KEEP_ALIVE_DURATION_MS)
.setDefaultIOReactorConfig(IOReactorConfig.custom().setSoKeepAlive(true).build()));
this.client = new RestHighLevelClient(builder);
}


public EsClient(String esAddr, String user, String pwd) {
validateParams(esAddr, user, pwd);

List<HttpHost> hosts = createHttpHosts(esAddr);

Header[] headers = createHeaders(user, pwd);

RestClientBuilder clientBuilder = createRestClientBuilder(hosts, headers);

initializeHighLevelClient(clientBuilder);

initializeSnifferIfNeeded();
}

private void validateParams(String esAddr, String user, String pwd) {
if (esAddr == null || esAddr.isEmpty() || user == null || user.isEmpty() || pwd == null || pwd.isEmpty()) {
throw new IllegalArgumentException("Invalid parameters provided");
}
}

private List<HttpHost> createHttpHosts(String esAddr) {
String[] addrs = esAddr.split(",");
List<HttpHost> hosts = new ArrayList<>();
for (String addr : addrs) {
String[] hostAndPort = addr.split(":");
String host = hostAndPort[0];
int port = Integer.parseInt(hostAndPort[1]);
HttpHost host = new HttpHost(hostAndPort[0], port);
hosts.add(host);
hosts.add(new HttpHost(host, port));
}
return hosts;
}

private Header[] createHeaders(String user, String pwd) {
String urlEncodePassword = new String(Base64.getUrlEncoder().encode(String.format("%s:%s", user, pwd).getBytes()));
String basicAuth = String.format("Basic %s", urlEncodePassword);
Header[] headers = new Header[]{new BasicHeader("Authorization", basicAuth), new BasicHeader("Content-Type", "application/json")};
return new Header[]{new BasicHeader("Authorization", basicAuth), new BasicHeader("Content-Type", "application/json")};
}

RestClientBuilder clientBuilder = RestClient.builder(hosts.toArray(new HttpHost[0]))
private RestClientBuilder createRestClientBuilder(List<HttpHost> hosts, Header[] headers) {
return RestClient.builder(hosts.toArray(new HttpHost[0]))
.setDefaultHeaders(headers)
.setHttpClientConfigCallback(x -> x.setMaxConnPerRoute(500)
.setMaxConnTotal(500)
.setDefaultRequestConfig(RequestConfig.custom().setSocketTimeout(10 * 60 * 1000)
.setConnectionRequestTimeout(5000 * 1000)
.setConnectTimeout(5000 * 1000).build())
.setKeepAliveStrategy((response, context) -> TimeUnit.MINUTES.toMillis(2))
.setFailureListener(sniffOnFailureListener)
.setFailureListener(sniffOnFailureListener)
.setHttpClientConfigCallback(x -> x.setMaxConnPerRoute(MAX_CONN_PER_ROUTE)
.setMaxConnTotal(MAX_CONN_TOTAL)
.setDefaultRequestConfig(RequestConfig.custom()
.setSocketTimeout(SOCKET_TIMEOUT_MS)
.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MS)
.setConnectTimeout(CONNECT_TIMEOUT_MS)
.build())
.setKeepAliveStrategy((response, context) -> KEEP_ALIVE_DURATION_MS)
.setDefaultIOReactorConfig(IOReactorConfig.custom().setSoKeepAlive(true).build()));
this.client = new RestHighLevelClient(clientBuilder);

}

public EsClient(List<String> restAddress, int httpPort, String username, String password, int timeOut, int snifferIntervalMillis, int snifferAfterFailDelayMillis) throws IOException {
snifferNodeInit(restAddress, httpPort, username, password, timeOut, snifferIntervalMillis, snifferAfterFailDelayMillis);
}

public EsClient(List<String> restAddress, int httpPort, String username, String password) throws IOException {
snifferNodeInit(restAddress, httpPort, username, password, TIME_OUT, SNIFF_INTERVAL_MILLIS, SNIFF_AFTER_FAILURE_DELAY_MILLIS);
private void initializeHighLevelClient(RestClientBuilder clientBuilder) {
this.client = new RestHighLevelClient(clientBuilder);
this.restClient = client.getLowLevelClient();
}

private void snifferNodeInit(List<String> restAddress, int httpPort, String username, String password, int timeOut, int snifferIntervalMillis, int snifferAfterFailDelayMillis) throws IOException {


HttpHost[] hosts = new HttpHost[restAddress.size()];
for (int index = 0; index < restAddress.size(); index++) {
hosts[index] = new HttpHost(restAddress.get(index), httpPort, "http");
}

RestClientBuilder.RequestConfigCallback requestConfigCallback = new RestClientBuilder.RequestConfigCallback() {
@Override
public RequestConfig.Builder customizeRequestConfig(
RequestConfig.Builder requestConfigBuilder) {
return requestConfigBuilder
.setConnectTimeout(timeOut)
.setSocketTimeout(timeOut);
}
};

RestClientBuilder.HttpClientConfigCallback httpClientConfigCallback = new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(
HttpAsyncClientBuilder httpClientBuilder) {
RequestConfig.Builder requestConfigBuilder = RequestConfig.custom()
.setConnectTimeout(timeOut)
.setSocketTimeout(timeOut)
.setConnectionRequestTimeout(timeOut);
httpClientBuilder.setDefaultRequestConfig(requestConfigBuilder.build());
return httpClientBuilder;
}
};

SniffOnFailureListener sniffOnFailureListener = new SniffOnFailureListener();
if (username != null && password != null) {
String token = "Basic " + new String(Base64.getUrlEncoder().encode((username + ":" + password).getBytes()));
Header[] tokenHeader = new Header[]{new BasicHeader("Authorization", token)};
restClientBuilder = RestClient.builder(hosts).setNodeSelector(SKIP_DEDICATED_NODES)
.setFailureListener(sniffOnFailureListener)
.setHttpClientConfigCallback(httpClientConfigCallback)
.setRequestConfigCallback(requestConfigCallback)
.setDefaultHeaders(tokenHeader);
} else {
restClientBuilder = RestClient.builder(hosts).setNodeSelector(SKIP_DEDICATED_NODES)
.setFailureListener(sniffOnFailureListener)
.setRequestConfigCallback(requestConfigCallback)
.setHttpClientConfigCallback(httpClientConfigCallback);
}

client = new RestHighLevelClient(restClientBuilder);
restClient = client.getLowLevelClient();

NodesSniffer elasticsearchNodesSniffer = new ElasticsearchNodesSniffer(
restClient,
TimeUnit.SECONDS.toMillis(5),
ElasticsearchNodesSniffer.Scheme.HTTP);

// important
private void initializeSniffer() {
sniffer = Sniffer.builder(restClient)
.setSniffIntervalMillis(snifferIntervalMillis)
.setSniffAfterFailureDelayMillis(snifferAfterFailDelayMillis)
.setNodesSniffer(elasticsearchNodesSniffer)
.setSniffIntervalMillis(SNIFF_INTERVAL_MILLIS)
.setSniffAfterFailureDelayMillis(SNIFF_AFTER_FAILURE_DELAY_MILLIS)
.setNodesSniffer(new ElasticsearchNodesSniffer(
restClient,
TimeUnit.SECONDS.toMillis(5),
ElasticsearchNodesSniffer.Scheme.HTTP))
.build();
sniffOnFailureListener.setSniffer(sniffer);
}

// important
private NodeSelector SKIP_DEDICATED_NODES = new NodeSelector() {
@Override
public void select(Iterable<Node> nodes) {
for (Iterator<Node> itr = nodes.iterator(); itr.hasNext(); ) {
Node node = itr.next();
if (node.getRoles() == null) continue;
if ((node.getRoles().isMasterEligible()
&& false == node.getRoles().isData()
&& false == node.getRoles().isIngest())
||
(node.getAttributes().containsKey("node_type")
&& node.getAttributes().get("node_type").contains("client")
&& false == node.getRoles().isData())) {
itr.remove();
}
}
}

@Override
public String toString() {
return "SKIP_DEDICATED_NODES";
}
};

public SearchResponse search(SearchRequest searchRequest) throws IOException {
SearchResponse res = this.client.search(searchRequest, RequestOptions.DEFAULT);
return res;
Expand Down Expand Up @@ -407,7 +377,7 @@ public EsRet dateHistogram(String indexName, String field, String interval, long
.extendedBounds(new LongBounds(startTime, endTime));//统计范围
SearchRequest searchRequest = new SearchRequest();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(builder).aggregation(aggregationBuilder).size(0).timeout(TimeValue.timeValueSeconds(10l));
searchSourceBuilder.query(builder).aggregation(aggregationBuilder).size(0).timeout(TimeValue.timeValueSeconds(10L));
searchRequest.source(searchSourceBuilder);
searchRequest.indices(indexName);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
Expand Down
Loading
Loading