Skip to content

Commit

Permalink
Support datastreams as an AuditLog Sink (#4257)
Browse files Browse the repository at this point in the history
Signed-off-by: tmanninger <[email protected]>
  • Loading branch information
tmanninger authored Sep 26, 2024
1 parent a62e99a commit 9c5e32a
Show file tree
Hide file tree
Showing 7 changed files with 498 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1591,6 +1591,50 @@ public List<Setting<?>> getSettings() {
)
);

// Internal OpenSearch DataStream
settings.add(
Setting.simpleString(
ConfigConstants.SECURITY_AUDIT_CONFIG_DEFAULT_PREFIX + ConfigConstants.SECURITY_AUDIT_OPENSEARCH_DATASTREAM_NAME,
Property.NodeScope,
Property.Filtered
)
);
settings.add(
Setting.boolSetting(
ConfigConstants.SECURITY_AUDIT_CONFIG_DEFAULT_PREFIX
+ ConfigConstants.SECURITY_AUDIT_OPENSEARCH_DATASTREAM_TEMPLATE_MANAGE,
true,
Property.NodeScope,
Property.Filtered
)
);
settings.add(
Setting.simpleString(
ConfigConstants.SECURITY_AUDIT_CONFIG_DEFAULT_PREFIX
+ ConfigConstants.SECURITY_AUDIT_OPENSEARCH_DATASTREAM_TEMPLATE_NAME,
Property.NodeScope,
Property.Filtered
)
);
settings.add(
Setting.intSetting(
ConfigConstants.SECURITY_AUDIT_CONFIG_DEFAULT_PREFIX
+ ConfigConstants.SECURITY_AUDIT_OPENSEARCH_DATASTREAM_TEMPLATE_NUMBER_OF_SHARDS,
1,
Property.NodeScope,
Property.Filtered
)
);
settings.add(
Setting.intSetting(
ConfigConstants.SECURITY_AUDIT_CONFIG_DEFAULT_PREFIX
+ ConfigConstants.SECURITY_AUDIT_OPENSEARCH_DATASTREAM_TEMPLATE_NUMBER_OF_REPLICAS,
0,
Property.NodeScope,
Property.Filtered
)
);

// External OpenSearch
settings.add(
Setting.listSetting(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.security.auditlog.sink;

import java.io.IOException;

import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.support.WriteRequest.RefreshPolicy;
import org.opensearch.client.Client;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ThreadContext.StoredContext;
import org.opensearch.security.auditlog.impl.AuditMessage;
import org.opensearch.security.support.ConfigConstants;
import org.opensearch.security.support.HeaderHelper;
import org.opensearch.threadpool.ThreadPool;

public abstract class AbstractInternalOpenSearchSink extends AuditLogSink {

protected final Client clientProvider;
private final ThreadPool threadPool;
private final DocWriteRequest.OpType storeOpType;

public AbstractInternalOpenSearchSink(
final String name,
final Settings settings,
final String settingsPrefix,
final Client clientProvider,
ThreadPool threadPool,
AuditLogSink fallbackSink,
DocWriteRequest.OpType storeOpType
) {
super(name, settings, settingsPrefix, fallbackSink);
this.clientProvider = clientProvider;
this.threadPool = threadPool;
this.storeOpType = storeOpType;
}

@Override
public void close() throws IOException {

}

public boolean doStore(final AuditMessage msg, String indexName) {

if (Boolean.parseBoolean(
HeaderHelper.getSafeFromHeader(threadPool.getThreadContext(), ConfigConstants.OPENDISTRO_SECURITY_CONF_REQUEST_HEADER)
)) {
if (log.isTraceEnabled()) {
log.trace("audit log of audit log will not be executed");
}
return true;
}

try (StoredContext ctx = threadPool.getThreadContext().stashContext()) {
try {
final IndexRequestBuilder irb = clientProvider.prepareIndex(indexName)
.setRefreshPolicy(RefreshPolicy.IMMEDIATE)
.setSource(msg.getAsMap());
threadPool.getThreadContext().putHeader(ConfigConstants.OPENDISTRO_SECURITY_CONF_REQUEST_HEADER, "true");
irb.setTimeout(TimeValue.timeValueMinutes(1));
if (this.storeOpType != null) {
irb.setOpType(this.storeOpType);
}
irb.execute().actionGet();
return true;
} catch (final Exception e) {
log.error("Unable to index audit log {} due to", msg, e);
return false;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.security.auditlog.sink;

// CS-SUPPRESS-SINGLE: RegexpSingleline https://github.com/opensearch-project/OpenSearch/issues/3663
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;

import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.admin.indices.datastream.CreateDataStreamAction;
import org.opensearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.ComposableIndexTemplate;
import org.opensearch.cluster.metadata.DataStream;
import org.opensearch.cluster.metadata.Template;
import org.opensearch.common.settings.Settings;
import org.opensearch.security.auditlog.impl.AuditMessage;
import org.opensearch.security.support.ConfigConstants;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.RemoteTransportException;

public final class InternalOpenSearchDataStreamSink extends AbstractInternalOpenSearchSink {

String dataStreamName;
private boolean dataStreamInitialized = false;

public InternalOpenSearchDataStreamSink(
final String name,
final Settings settings,
final String settingsPrefix,
final Path configPath,
final Client clientProvider,
ThreadPool threadPool,
AuditLogSink fallbackSink
) {
super(name, settings, settingsPrefix, clientProvider, threadPool, fallbackSink, DocWriteRequest.OpType.CREATE);
Settings sinkSettings = getSinkSettings(settingsPrefix);

this.dataStreamName = sinkSettings.get(ConfigConstants.SECURITY_AUDIT_OPENSEARCH_DATASTREAM_NAME, "opensearch-security-auditlog");

// Node is no ready yet... this.initDataStream() must be called later (in method doStore())
}

private boolean initDataStream() {

if (this.dataStreamInitialized) {
return true;
}

Settings sinkSettings = getSinkSettings(settingsPrefix);

final boolean templateManage = sinkSettings.getAsBoolean(
ConfigConstants.SECURITY_AUDIT_OPENSEARCH_DATASTREAM_TEMPLATE_MANAGE,
true
);

// Create datastream template
if (templateManage) {

final String templateName = sinkSettings.get(
ConfigConstants.SECURITY_AUDIT_OPENSEARCH_DATASTREAM_TEMPLATE_NAME,
"opensearch-security-auditlog"
);
final Integer numberOfReplicas = sinkSettings.getAsInt(
ConfigConstants.SECURITY_AUDIT_OPENSEARCH_DATASTREAM_TEMPLATE_NUMBER_OF_REPLICAS,
0
);
final Integer numberOfShards = sinkSettings.getAsInt(
ConfigConstants.SECURITY_AUDIT_OPENSEARCH_DATASTREAM_TEMPLATE_NUMBER_OF_SHARDS,
1
);

ComposableIndexTemplate template = new ComposableIndexTemplate(
List.of(dataStreamName),
new Template(
Settings.builder().put("number_of_shards", numberOfShards).put("number_of_replicas", numberOfReplicas).build(),
null,
null
),
null,
null,
null,
null,
new ComposableIndexTemplate.DataStreamTemplate(new DataStream.TimestampField("@timestamp"))
);

try {
PutComposableIndexTemplateAction.Request request = new PutComposableIndexTemplateAction.Request(templateName);
request.indexTemplate(template);
AcknowledgedResponse response = clientProvider.execute(PutComposableIndexTemplateAction.INSTANCE, request).get();
if (!response.isAcknowledged()) {
log.error("Failed to create index template {}", templateName);
return false;
}
} catch (final Exception e) {
log.error("Cannot create index template {} due to", templateName, e);
return false;
}
}

CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
try {
AcknowledgedResponse response = clientProvider.admin().indices().createDataStream(createDataStreamRequest).get();
if (!response.isAcknowledged()) {
log.error("Failed to create datastream {}", dataStreamName);
}
this.dataStreamInitialized = true;
} catch (final Exception e) {
if (e.getCause() instanceof ResourceAlreadyExistsException
|| (e.getCause() instanceof RemoteTransportException
&& e.getCause().getCause() instanceof ResourceAlreadyExistsException)) {
log.trace("Datastream {} already exists", dataStreamName);
this.dataStreamInitialized = true;
} else {
log.error("Cannot create datastream {} due to", dataStreamName, e);
return false;
}
}

return this.dataStreamInitialized;
}

@Override
public void close() throws IOException {

}

public boolean doStore(final AuditMessage msg) {

if (!this.initDataStream()) {
log.error("Datastream initializaten failed. Cannot write to auditlog");
return false;
}

return super.doStore(msg, this.dataStreamName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,20 @@
import java.io.IOException;
import java.nio.file.Path;

import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.support.WriteRequest.RefreshPolicy;
import org.opensearch.client.Client;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ThreadContext.StoredContext;
import org.opensearch.security.auditlog.impl.AuditMessage;
import org.opensearch.security.support.ConfigConstants;
import org.opensearch.security.support.HeaderHelper;
import org.opensearch.threadpool.ThreadPool;

import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

public final class InternalOpenSearchSink extends AuditLogSink {
public final class InternalOpenSearchSink extends AbstractInternalOpenSearchSink {

private final Client clientProvider;
final String index;
final String type;
private DateTimeFormatter indexPattern;
private final ThreadPool threadPool;

public InternalOpenSearchSink(
final String name,
Expand All @@ -45,14 +38,12 @@ public InternalOpenSearchSink(
ThreadPool threadPool,
AuditLogSink fallbackSink
) {
super(name, settings, settingsPrefix, fallbackSink);
this.clientProvider = clientProvider;
Settings sinkSettings = getSinkSettings(settingsPrefix);
super(name, settings, settingsPrefix, clientProvider, threadPool, fallbackSink, null);

Settings sinkSettings = getSinkSettings(settingsPrefix);
this.index = sinkSettings.get(ConfigConstants.SECURITY_AUDIT_OPENSEARCH_INDEX, "'security-auditlog-'YYYY.MM.dd");
this.type = sinkSettings.get(ConfigConstants.SECURITY_AUDIT_OPENSEARCH_TYPE, null);

this.threadPool = threadPool;
try {
this.indexPattern = DateTimeFormat.forPattern(index);
} catch (IllegalArgumentException e) {
Expand All @@ -69,29 +60,6 @@ public void close() throws IOException {
}

public boolean doStore(final AuditMessage msg) {

if (Boolean.parseBoolean(
HeaderHelper.getSafeFromHeader(threadPool.getThreadContext(), ConfigConstants.OPENDISTRO_SECURITY_CONF_REQUEST_HEADER)
)) {
if (log.isTraceEnabled()) {
log.trace("audit log of audit log will not be executed");
}
return true;
}

try (StoredContext ctx = threadPool.getThreadContext().stashContext()) {
try {
final IndexRequestBuilder irb = clientProvider.prepareIndex(getExpandedIndexName(indexPattern, index))
.setRefreshPolicy(RefreshPolicy.IMMEDIATE)
.setSource(msg.getAsMap());
threadPool.getThreadContext().putHeader(ConfigConstants.OPENDISTRO_SECURITY_CONF_REQUEST_HEADER, "true");
irb.setTimeout(TimeValue.timeValueMinutes(1));
irb.execute().actionGet();
return true;
} catch (final Exception e) {
log.error("Unable to index audit log {} due to", msg, e);
return false;
}
}
return super.doStore(msg, getExpandedIndexName(this.indexPattern, this.index));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,17 @@ private final AuditLogSink createSink(final String name, final String type, fina
case "internal_opensearch":
sink = new InternalOpenSearchSink(name, settings, settingsPrefix, configPath, clientProvider, threadPool, fallbackSink);
break;
case "internal_opensearch_data_stream":
sink = new InternalOpenSearchDataStreamSink(
name,
settings,
settingsPrefix,
configPath,
clientProvider,
threadPool,
fallbackSink
);
break;
case "external_opensearch":
try {
sink = new ExternalOpenSearchSink(name, settings, settingsPrefix, configPath, fallbackSink);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,13 @@ public class ConfigConstants {

public static final String SECURITY_AUDIT_CONFIG_DEFAULT_PREFIX = "plugins.security.audit.config.";

// Internal Opensearch data_stream
public static final String SECURITY_AUDIT_OPENSEARCH_DATASTREAM_NAME = "data_stream.name";
public static final String SECURITY_AUDIT_OPENSEARCH_DATASTREAM_TEMPLATE_MANAGE = "data_stream.template.manage";
public static final String SECURITY_AUDIT_OPENSEARCH_DATASTREAM_TEMPLATE_NAME = "data_stream.template.name";
public static final String SECURITY_AUDIT_OPENSEARCH_DATASTREAM_TEMPLATE_NUMBER_OF_REPLICAS = "data_stream.template.number_of_replicas";
public static final String SECURITY_AUDIT_OPENSEARCH_DATASTREAM_TEMPLATE_NUMBER_OF_SHARDS = "data_stream.template.number_of_shards";

// Internal / External OpenSearch
public static final String SECURITY_AUDIT_OPENSEARCH_INDEX = "index";
public static final String SECURITY_AUDIT_OPENSEARCH_TYPE = "type";
Expand Down
Loading

0 comments on commit 9c5e32a

Please sign in to comment.