Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stop special directory log coll #561

Merged
merged 4 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
package com.xiaomi.mone.log.agent.channel;

import com.xiaomi.mone.log.agent.channel.memory.ChannelMemory;
import com.xiaomi.mone.log.agent.input.Input;
import com.xiaomi.mone.log.api.enums.LogTypeEnum;
import com.xiaomi.mone.log.api.model.meta.LogPattern;
import com.xiaomi.mone.log.utils.NetUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -84,6 +86,11 @@ public ChannelState state() {

public abstract Long getLogCounts();

public LogTypeEnum getLogTypeEnum() {
Input input = getChannelDefine().getInput();
return LogTypeEnum.name2enum(input.getType());
}

/**
* Query IP information based on the actual collection path.
*
Expand All @@ -100,8 +107,7 @@ protected String getTailPodIp(String pattern) {
if (null != actualIpRel) {
return actualIpRel.getIp();
}
log.info("getTailPodIp empty,pattern:{}", pattern);
return StringUtils.EMPTY;
return NetUtil.getLocalIp();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,9 @@ public class ChannelDefine implements Serializable {
private Boolean singleMetaData;

private String podType;
/**
* 某个机器下线的时候需要删除的该目录下的日志采集,只有当某个应用的机器下线时才有值
*/
private String delDirectory;

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.xiaomi.mone.log.agent.filter.FilterChain;
import com.xiaomi.mone.log.agent.input.Input;
import com.xiaomi.mone.log.agent.output.Output;
import com.xiaomi.mone.log.api.enums.LogTypeEnum;
import com.xiaomi.mone.log.api.enums.OperateEnum;
import com.xiaomi.mone.log.api.model.vo.UpdateLogProcessCmd;
import com.xiaomi.mone.log.common.Constant;
Expand Down Expand Up @@ -419,9 +420,17 @@ private void updateConfig(List<ChannelDefine> channelDefines) {
* @param channelDefines
*/
private void deleteConfig(List<ChannelDefine> channelDefines, boolean directDel) {
// 指定目录下文件采集删除
delSpecialFileColl(channelDefines);
// 整个文件采集删除
delTailFileColl(channelDefines, directDel);
}

private void delTailFileColl(List<ChannelDefine> channelDefines, boolean directDel) {
List<ChannelDefine> channelDels = channelDefines.stream()
.filter(channelDefine -> null != channelDefine.getOperateEnum()
&& channelDefine.getOperateEnum().getCode().equals(OperateEnum.DELETE_OPERATE.getCode()))
&& channelDefine.getOperateEnum().getCode().equals(OperateEnum.DELETE_OPERATE.getCode())
&& StringUtils.isEmpty(channelDefine.getDelDirectory()))
.collect(Collectors.toList());
if (directDel) {
channelDels = channelDefines;
Expand Down Expand Up @@ -458,6 +467,48 @@ private void deleteConfig(List<ChannelDefine> channelDefines, boolean directDel)
}
}

/**
* 删除特定目录下的日志采集
*
* @param channelDefines
*/
private void delSpecialFileColl(List<ChannelDefine> channelDefines) {
//找出某个机器下线时需要删除的pod
List<ChannelDefine> delSpecialFiles = channelDefines.stream()
.filter(channelDefine -> null != channelDefine.getOperateEnum()
&& channelDefine.getOperateEnum().getCode().equals(OperateEnum.DELETE_OPERATE.getCode())
&& StringUtils.isNotEmpty(channelDefine.getDelDirectory()))
.collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(delSpecialFiles)) {
try {
for (ChannelService channelService : channelServiceList) {
CompletableFuture.runAsync(() -> {
AbstractChannelService abstractChannelService = (AbstractChannelService) channelService;
Long channelId = abstractChannelService.getChannelDefine().getChannelId();

List<ChannelDefine> defineList = delSpecialFiles.stream()
.filter(channelDefine -> Objects.equals(channelDefine.getChannelId(), channelId))
.collect(Collectors.toList());

for (ChannelDefine channelDefine : defineList) {
log.info("deleteConfig,deleteCollFile,channelDefine:{}", gson.toJson(channelDefine));
channelService.deleteCollFile(channelDefine.getDelDirectory());
}
//也需要删除opentelemetry日志
if (LogTypeEnum.OPENTELEMETRY == abstractChannelService.getLogTypeEnum()) {
for (ChannelDefine channelDefine : delSpecialFiles) {
log.info("deleteConfig OPENTELEMETRY,deleteCollFile,channelDefine:{}", gson.toJson(channelDefine));
channelService.deleteCollFile(channelDefine.getDelDirectory());
}
}
});
}
} catch (Exception e) {
log.error("delSpecialFileColl error,delSpecialFiles:{}", gson.toJson(channelDefines), e);
}
}
}

/**
* 差集
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,11 @@ public interface ChannelService extends Closeable {
* openteltry日志多文件结束clean
*/
void delayDeletionFinishedFile();

/**
* 删除某个目录的文件采集,适用于k8s中使用demonset方式部署时某个某个下线,需要删除它的采集,解除资源占用
*
* @param directory
*/
void deleteCollFile(String directory);
}
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,37 @@ public void delayDeletionFinishedFile() {
}
}

@Override
public void deleteCollFile(String directory) {
log.info("deleteCollFile,directory:{}", directory);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
log.error("deleteCollFile sleep error,directory:{}", directory, e);
}
for (Map.Entry<String, LogFile> logFileEntry : logFileMap.entrySet()) {
if (logFileEntry.getKey().contains(directory)) {
logFileEntry.getValue().setStop(true);
}
}
for (Map.Entry<String, Future> futureEntry : futureMap.entrySet()) {
if (futureEntry.getKey().contains(directory)) {
futureEntry.getValue().cancel(false);
}
}
for (Map.Entry<String, ScheduledFuture<?>> futureEntry : lastFileLineScheduledFutureMap.entrySet()) {
if (futureEntry.getKey().contains(directory)) {
futureEntry.getValue().cancel(false);
}
}
List<String> delFiles = reOpenMap.keySet().stream()
.filter(filePath -> filePath.contains(directory))
.collect(Collectors.toList());
for (String delFile : delFiles) {
reOpenMap.remove(delFile);
}
}

private void startExportQueueDataThread() {
scheduledFuture = ExecutorUtil.scheduleAtFixedRate(() -> {
// 超过10s 未发送mq消息,才进行异步发送
Expand Down Expand Up @@ -548,6 +579,7 @@ public void close() {
}
log.info("stop file monitor,fileName:", logFileMap.keySet().stream().collect(Collectors.joining(SYMBOL_COMMA)));
lineMessageList.clear();
reOpenMap.clear();
}

public Long getChannelId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@
import com.xiaomi.mone.log.agent.channel.ChannelDefine;
import com.xiaomi.mone.log.agent.channel.conf.AgentTailConf;
import com.xiaomi.mone.log.agent.factory.OutPutServiceFactory;
import com.xiaomi.mone.log.agent.output.Output;
import com.xiaomi.mone.log.agent.filter.FilterTrans;
import com.xiaomi.mone.log.agent.input.AppLogInput;
import com.xiaomi.mone.log.agent.output.Output;
import com.xiaomi.mone.log.api.enums.LogTypeEnum;
import com.xiaomi.mone.log.api.enums.MiddlewareEnum;
import com.xiaomi.mone.log.api.model.meta.*;
import com.xiaomi.mone.log.common.Constant;
import com.xiaomi.mone.log.utils.NetUtil;
Expand All @@ -41,7 +40,8 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static com.xiaomi.mone.log.common.Constant.*;
import static com.xiaomi.mone.log.common.Constant.GSON;
import static com.xiaomi.mone.log.common.Constant.SYMBOL_COLON;

/**
* rpc方式从log-manager获取channel元数据
Expand Down Expand Up @@ -112,6 +112,7 @@ public static AgentTailConf logCollectMeta2ChannelDefines(LogCollectMeta logColl
channelDefine.setPodNames(logCollectMeta.getPodNames());
channelDefine.setSingleMetaData(logCollectMeta.getSingleMetaData());
channelDefine.setPodType(logCollectMeta.getPodType());
channelDefine.setDelDirectory(logCollectMeta.getDelDirectory());

List<LogPattern> logPatternList = appLogMeta.getLogPatternList();
for (LogPattern logPattern : logPatternList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,10 @@ public class LogCollectMeta implements Serializable {
private Boolean singleMetaData;

private String podType;

/**
* 某个机器下线的时候需要删除的该目录下的日志采集,只有当某个应用的机器下线时才有值
*/
private String delDirectory;

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public interface MilogAgentService {

void publishIncrementDel(Long tailId, Long milogAppId, List<String> ips);

void delLogCollDirectoryByIp(Long tailId, String directory, List<String> ips);

Result<String> agentOfflineBatch(MilogAgentIpParam agentIpParam);

LogCollectMeta getLogCollectMetaFromManager(String ip);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,16 @@
import com.xiaomi.mone.log.common.Constant;
import com.xiaomi.mone.log.common.Result;
import com.xiaomi.mone.log.manager.common.Utils;
import com.xiaomi.mone.log.manager.dao.*;
import com.xiaomi.mone.log.manager.dao.MilogAppMiddlewareRelDao;
import com.xiaomi.mone.log.manager.dao.MilogLogTailDao;
import com.xiaomi.mone.log.manager.dao.MilogLogstoreDao;
import com.xiaomi.mone.log.manager.dao.MilogMiddlewareConfigDao;
import com.xiaomi.mone.log.manager.domain.LogProcess;
import com.xiaomi.mone.log.manager.model.bo.MilogAgentIpParam;
import com.xiaomi.mone.log.manager.model.pojo.*;
import com.xiaomi.mone.log.manager.model.pojo.MilogAppMiddlewareRel;
import com.xiaomi.mone.log.manager.model.pojo.MilogLogStoreDO;
import com.xiaomi.mone.log.manager.model.pojo.MilogLogTailDo;
import com.xiaomi.mone.log.manager.model.pojo.MilogMiddlewareConfig;
import com.xiaomi.mone.log.manager.service.env.HeraEnvIpService;
import com.xiaomi.mone.log.manager.service.env.HeraEnvIpServiceFactory;
import com.xiaomi.mone.log.manager.service.impl.HeraAppServiceImpl;
Expand Down Expand Up @@ -68,9 +74,6 @@ public class MilogAgentServiceImpl implements MilogAgentService {
@Resource
private HeraEnvIpServiceFactory heraEnvIpServiceFactory;

@Resource
private MilogAppTopicRelDao milogAppTopicRelDao;

@Resource
private MilogLogTailDao milogLogtailDao;

Expand Down Expand Up @@ -209,26 +212,46 @@ public void publishIncrementDel(Long tailId, Long milogAppId, List<String> ips)
log.info("删除配置同步到 logAgent,tailId:{},milogAppId:{},ips:{}", tailId, milogAppId, gson.toJson(ips));
AppLogMeta appLogMeta = new AppLogMeta();
LogPattern logPattern = new LogPattern();
MilogAppTopicRelDO appTopicRel = milogAppTopicRelDao.queryById(milogAppId);
assemblyAppInfo(milogAppId, appLogMeta);
logPattern.setLogtailId(tailId);
logPattern.setOperateEnum(OperateEnum.DELETE_OPERATE);
appLogMeta.setAppId(appTopicRel.getAppId());
appLogMeta.setAppName(appTopicRel.getAppName());

appLogMeta.setLogPatternList(Arrays.asList(logPattern));
ips.forEach(ip -> {
LogCollectMeta logCollectMeta = new LogCollectMeta();
logCollectMeta.setAgentIp(ip);
logCollectMeta.setAgentMachine("");
logCollectMeta.setAgentId("");
logCollectMeta.setAppLogMetaList(Arrays.asList(appLogMeta));
// todo 获取并设置agent全局filter
AgentDefine agentDefine = new AgentDefine();
agentDefine.setFilters(new ArrayList<>());
logCollectMeta.setAgentDefine(agentDefine);
sengConfigToAgent(ip, logCollectMeta);
});
}

private void assemblyAppInfo(Long milogAppId, AppLogMeta appLogMeta) {
AppBaseInfo appBaseInfo = heraAppService.queryById(milogAppId);
appLogMeta.setAppId(milogAppId);
if (null != appBaseInfo) {
appLogMeta.setAppName(appBaseInfo.getAppName());
}
}

@Override
public void delLogCollDirectoryByIp(Long tailId, String directory, List<String> ips) {
log.info("delLogCollDirectoryByIp logAgent,tailId:{},directory:{},ips:{}", tailId, directory, gson.toJson(ips));
AppLogMeta appLogMeta = new AppLogMeta();
LogPattern logPattern = new LogPattern();
logPattern.setLogtailId(tailId);
logPattern.setOperateEnum(OperateEnum.DELETE_OPERATE);
appLogMeta.setLogPatternList(Arrays.asList(logPattern));
LogCollectMeta logCollectMeta = new LogCollectMeta();
for (String ip : ips) {
logCollectMeta.setAgentIp(ip);
logCollectMeta.setDelDirectory(directory);
logCollectMeta.setAppLogMetaList(Arrays.asList(appLogMeta));
sengConfigToAgent(ip, logCollectMeta);
}
}

@Override
public Result<String> agentOfflineBatch(MilogAgentIpParam agentIpParam) {
if (null == agentIpParam || CollectionUtils.isEmpty(agentIpParam.getIps())) {
Expand Down Expand Up @@ -284,10 +307,8 @@ private LogCollectMeta buildLogCollectMeta(String agentIp) {
* @return
*/
private AppLogMeta assembleSingleConfig(Long milogAppId, List<LogPattern> logPatternList) {
MilogAppTopicRelDO appInfo = milogAppTopicRelDao.queryById(milogAppId);
AppLogMeta appLogMeta = new AppLogMeta();
appLogMeta.setAppId(appInfo.getAppId());
appLogMeta.setAppName(appInfo.getAppName());
assemblyAppInfo(milogAppId, appLogMeta);
appLogMeta.setLogPatternList(logPatternList);
return appLogMeta;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public void updateSendMsg(MilogLogTailDo milogLogtailDo, List<String> oldIps, bo
// createConsumerGroup(milogLogtailDo.getSpaceId(), milogLogtailDo.getStoreId(), milogLogtailDo.getId(), milogMiddlewareConfigDao.queryById(middlewareRels.get(0).getMiddlewareId()), milogLogtailDo.getMilogAppId(), false);
logTailService.sengMessageToStream(milogLogtailDo, OperateEnum.UPDATE_OPERATE.getCode());
}
logTailService.compareChangeDelIps(milogLogtailDo.getId(), milogLogtailDo.getMilogAppId(), milogLogtailDo.getIps(), oldIps);
logTailService.compareChangeDelIps(milogLogtailDo.getId(), milogLogtailDo.getLogPath(), milogLogtailDo.getIps(), oldIps);
}

@Override
Expand Down
Loading