Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: update es and upgrade file #848

Merged
merged 1 commit into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions jcommon/es/src/main/java/com/xiaomi/mone/es/EsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@
public class EsClient {

private static Sniffer sniffer;
private static final int SNIFF_INTERVAL_MILLIS = 30 * 1000;
private static final int SNIFF_AFTER_FAILURE_DELAY_MILLIS = 30 * 1000;
private static final int SNIFF_INTERVAL_MILLIS = 60 * 1000 * 3;
private static final int SNIFF_AFTER_FAILURE_DELAY_MILLIS = 60 * 1000;
private static final int MAX_CONN_PER_ROUTE = 500;
private static final int MAX_CONN_TOTAL = 500;
private static final int SOCKET_TIMEOUT_MS = 10 * 60 * 1000;
Expand Down Expand Up @@ -196,7 +196,7 @@ private void initializeSniffer() {
.setSniffAfterFailureDelayMillis(SNIFF_AFTER_FAILURE_DELAY_MILLIS)
.setNodesSniffer(new ElasticsearchNodesSniffer(
restClient,
TimeUnit.SECONDS.toMillis(5),
TimeUnit.SECONDS.toMillis(60),
ElasticsearchNodesSniffer.Scheme.HTTP))
.build();
sniffOnFailureListener.setSniffer(sniffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,9 @@ public void getClusterHealth() throws IOException {

@Test
public void queryIndexMetadataTest() throws IOException {
GetMappingsResponse metadata = client.queryIndexMapping("zgq_common_milog_staging_app_private_1");
System.out.println(String.format("result:%s", gson.toJson(metadata)));
GetMappingsResponse metadata = client.queryIndexMapping("test_scf_log_index");
// Map<String, MappingMetadata> mappings = metadata.mappings();
// System.out.println(String.format("result:%s", gson.toJson(metadata)));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public void bulkInsert() throws InterruptedException {

NacosConfig config = new NacosConfig();
config.setDataId("zzy_new");
config.init();
// config.init();

String ip = config.getConfig("es_ip");
String user = config.getConfig("es_user");
Expand Down Expand Up @@ -53,13 +53,14 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
int count = 0;
while (true) {
// processor.bulkInsert(indexName, data);
processor.bulkUpsert(indexName, "YpzPE4UBt3Uy5NFQ1V5e", data);
processor.bulkInsert(indexName, data);
count++;
if (count == n) {
break;
}
}
Thread.sleep(10000l);
// Thread.sleep(10000l);
System.in.read();
}catch (Exception e){
e.printStackTrace();
}
Expand Down
7 changes: 7 additions & 0 deletions jcommon/file/src/main/java/com/xiaomi/mone/file/ILogFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,12 @@ public interface ILogFile {

void initLogFile(String file, ReadListener listener, long pointer, long lineNumber);

/**
* It only needs to be called when an exception occurs and can only be called externally.
*/
void setExceptionFinish();

boolean getExceptionFinish();


}
12 changes: 12 additions & 0 deletions jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class LogFile implements ILogFile {
@Setter
private volatile boolean reFresh;

private volatile boolean exceptionFinish;

@Getter
private int beforePointerHashCode;

Expand Down Expand Up @@ -171,6 +173,16 @@ public void initLogFile(String file, ReadListener listener, long pointer, long l
this.lineNumber = lineNumber;
}

@Override
public void setExceptionFinish() {
exceptionFinish = true;
}

@Override
public boolean getExceptionFinish() {
return exceptionFinish;
}

private String lineCutOff(String line) {
if (null != line) {
//todo 大行文件先临时截断
Expand Down
20 changes: 17 additions & 3 deletions jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class LogFile2 implements ILogFile {
@Setter
private volatile boolean reFresh;

private volatile boolean exceptionFinish;

@Getter
private int beforePointerHashCode;

Expand Down Expand Up @@ -110,20 +112,21 @@ private void open() {
}
}

@Override
public void readLine() throws IOException {
while (true) {
open();
//兼容文件切换时,缓存的pointer
try {
log.info("open file:{},pointer:{}", file, this.pointer);
log.info("open file:{},pointer:{},lineNumber:{},", file, this.pointer, this.lineNumber);
if (pointer > raf.length()) {
pointer = 0;
lineNumber = 0;
}
} catch (Exception e) {
log.error("file.length() IOException, file:{}", this.file, e);
}
log.info("rel open file:{},pointer:{}", file, this.pointer);
log.info("rel open file:{},pointer:{},lineNumber:{}", file, this.pointer, this.lineNumber);
raf.seek(pointer);

while (true) {
Expand Down Expand Up @@ -166,6 +169,7 @@ public void readLine() throws IOException {
}

if (listener.isBreak(line)) {
log.info("isBreak:{},pointer:{},lineNumber:{},fileKey:{}", this.file, this.pointer, this.lineNumber, this.fileKey);
stop = true;
break;
}
Expand Down Expand Up @@ -193,7 +197,7 @@ public void readLine() throws IOException {
}
raf.close();
if (stop) {
log.info("stop:{},pointer:{},fileKey:{}", this.file, this.pointer, this.fileKey);
log.info("stop:{},pointer:{},lineNumber:{},fileKey:{}", this.file, this.pointer, this.lineNumber, this.fileKey);
FileInfoCache.ins().put(this.fileKey.toString(), FileInfo.builder().pointer(this.pointer).fileName(this.file).build());
break;
}
Expand All @@ -209,6 +213,16 @@ public void initLogFile(String file, ReadListener listener, long pointer, long l
this.lineNumber = lineNumber;
}

@Override
public void setExceptionFinish() {
exceptionFinish = true;
}

@Override
public boolean getExceptionFinish() {
return exceptionFinish;
}

private String lineCutOff(String line) {
if (null != line) {
//todo 大行文件先临时截断
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ public void testLogFileMonitor() {
monitor.setListener(new DefaultMonitorListener(monitor, readEvent -> {
System.out.println(readEvent.getReadResult().getLines());
}));
String fileName = "/home/work/log/test/provider/server.log.*";
String fileName = "/home/work/log/file.log.*";
Pattern pattern = Pattern.compile(fileName);
monitor.reg("/home/work/log/test/provider/", it -> {
monitor.reg("/home/work/log", it -> {
boolean matches = pattern.matcher(it).matches();
log.info("file:{},matches:{}", it, matches);
return matches;
return true;
});
log.info("reg finish");
System.in.read();
Expand Down
Loading