-
Notifications
You must be signed in to change notification settings - Fork 164
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[RORDEV-1263] Data stream support in ROR audit #1076
base: develop
Are you sure you want to change the base?
Conversation
_ <- Task.delay(logger.info(s"Trying to setup ROR audit sink with data stream ${settings.dataStreamName.show}..")) | ||
_ <- service.createIndexLifecyclePolicy(settings.lifecyclePolicy, settings.lifecyclePolicyJson(service.capabilities.ilmMaxPrimaryShardSize)) | ||
_ <- service.createComponentTemplateForMappings(settings.componentTemplateMappingsId, settings.mappings, settings.mappingsMetadata) | ||
_ <- service.createComponentTemplateForIndex(settings.componentTemplateSettingsId, settings.lifecyclePolicy, settings.indexSettingsMetadata) | ||
_ <- service.createIndexTemplate(settings.indexTemplate, settings.dataStreamName, NonEmptyList.of(settings.componentTemplateMappingsId, settings.componentTemplateSettingsId), settings.indexTemplateMetadata) | ||
_ <- service.createDataStream(settings.dataStreamName) | ||
_ <- Task.delay(logger.info(s"ROR audit data stream ${settings.dataStreamName.show} created.")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, we should handle ACKs from service responses. We will probably need to query the resources to ensure they are available. Some of the queries would require new methods based on reflection.
Now, if the request fails on any step, it cannot be resumed (we need something like creating resources if not exist for every one of the given resources)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if I understand the problem. Could you please explain deeply?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's assume that creating an index lifecycle request returned the following response:
{
"acknowledged": false
}
This means that the master node did not confirm the request. As a result, the index lifecycle policy was not successfully registered in the cluster.
In the next request, we attempt to create a component template that references this lifecycle policy. However, since the master node is unaware of the index lifecycle policy, the request fails.
If you retry to save ROR settings again, you send a creation request for the lifecycle policy and you can get a conflict. The policy name was created in a previous attempt. This leaves us stuck with an index lifecycle policy but without a data stream.
To avoid this, we should proceed with further requests only after the lifecycle policy is confirmed by the master node (we should check the acknowledge response). Or if we resume the creation process, we should skip the request if the lifecycle policy already exists.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, IMO it's very important that all of these resource creation methods are idempotent. If the resource (eg. ilm policy exists, we should not create one).
core/src/main/scala/tech/beshu/ror/accesscontrol/audit/AuditingTool.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/tech/beshu/ror/accesscontrol/audit/AuditingTool.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/tech/beshu/ror/accesscontrol/audit/AuditingTool.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/tech/beshu/ror/accesscontrol/audit/AuditingTool.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/tech/beshu/ror/accesscontrol/audit/AuditingTool.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/tech/beshu/ror/accesscontrol/domain/indices.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/tech/beshu/ror/accesscontrol/audit/DataStreamAuditSinkCreator.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/tech/beshu/ror/accesscontrol/audit/DataStreamAuditSinkCreator.scala
Outdated
Show resolved
Hide resolved
.leftMap { | ||
case RorAuditDataStream.CreationError.FormatError(msg) => | ||
AuditingSettingsCreationError(Message( | ||
s"Illegal format for 'data_stream' - ${msg.show}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add an info that is the ROR audit data stream
case other => | ||
unsupportedOutputTypeError( | ||
unsupportedType = other, | ||
supportedTypes = NonEmptyList.of("data_stream", "index", "log") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we take info consideration the es version also here (I mean when we create the supportedTypes list)
@@ -263,21 +266,28 @@ class ReadonlyRest(coreFactory: CoreFactory, | |||
} | |||
|
|||
object ReadonlyRest { | |||
type AuditSinkCreator = AuditCluster => AuditSinkService | |||
trait AuditSinkCreator { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this trait should be defined in the tech.beshu.ror.accesscontrol.audit
package. WDYT?
core.rorConfig.auditingSettings | ||
.flatMap(settings => AuditingTool.create(settings, auditSinkCreator)(environmentConfig.clock, loggingContext)) | ||
.map(settings => AuditingTool.create(settings, auditSinkCreator.index, auditSinkCreator.dataStream)(using environmentConfig.clock, loggingContext)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we could pass here AuditSinkCreator
instead of extracting data stream and index creators at this level. WDYT?
@@ -28,13 +28,28 @@ object DurationOps { | |||
implicit class RefinedDurationOps(val duration: Duration) extends AnyVal { | |||
def toRefinedPositive: Either[String, PositiveFiniteDuration] = duration match { | |||
case v: FiniteDuration if v.toMillis > 0 => | |||
v.toCoarsest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to remove?
@@ -61,39 +61,43 @@ class AuditingToolTests extends AnyWordSpec with MockFactory with BeforeAndAfter | |||
"request was allowed and verbosity level was ERROR" in { | |||
val auditingTool = AuditingTool.create( | |||
settings = auditSettings(new DefaultAuditLogSerializer), | |||
auditSinkServiceCreator = _ => mock[AuditSinkService] | |||
).get | |||
indexAuditSinkCreator = _ => mock[IndexBasedAuditSinkService], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we add datastream-related tests in this suite?
@@ -1354,16 +1352,16 @@ class ReadonlyRestStartingTests | |||
mapWithIntervalFrom(refreshInterval) ++ | |||
mapWithMaxYamlSize(maxYamlSize) ++ | |||
Map( | |||
"com.readonlyrest.settings.loading.delay" -> "0" | |||
"com.readonlyrest.settings.loading.delay" -> "1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's intentional. The FiniteDuration type has been changed to the PositiveFiniteDuration. The value of 0 became invalid, so the default was used (5 seconds) in the tests. With this change, I reduced all unit test execution time by 5 minutes.
"com.readonlyrest.settings.loading.delay" -> "0" | ||
) | ||
Map( | ||
"com.readonlyrest.settings.loading.delay" -> "1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the same here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same here
new EsAuditSinkService(client) | ||
case remote: AuditCluster.RemoteAuditCluster => | ||
HighLevelClientAuditSinkService.create(remote) | ||
private def auditSinkCreator: AuditSinkCreator = new AuditSinkCreator { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe instead of throwing the illegal state ex, we can make this state impossible?
Eg. by having two AuditSinkCreator traits? One supporting only index sink. And the second one supporting both?
class EsAuditSinkService(client: Client, | ||
threadPool: ThreadPool) | ||
extends AuditSinkService | ||
final class EsAuditSinkService(client: NodeClient, threadPool: ThreadPool, jsonParserFactory: XContentJsonParserFactory) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we shouldn't rename it to NodeClientBasedAuditSinkService
. WDYT?
No description provided.