Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ti_threatconnect: fix handling of missing cursor.last_timestamp #13235

Merged
merged 1 commit into from
Mar 27, 2025

Conversation

efd6
Copy link
Contributor

@efd6 efd6 commented Mar 21, 2025

Proposed commit message

ti_threatconnect: fix handling of missing cursor.last_timestamp

It is possible to get into a state where the program expects there to be
a last_timestamp in the cursor but none exists due to previous data
being present but empty. Fix this by falling back to the look-back time
if the last_timestamp is missing.

Also reorganise the code to make the logic less opaque.

Checklist

  • I have reviewed tips for building integrations and this pull request is aligned with them.
  • I have verified that all data streams collect metrics or logs.
  • I have added an entry to my package's changelog.yml file.
  • I have verified that Kibana version constraints are current according to guidelines.
  • I have verified that any added dashboard complies with Kibana's Dashboard good practices

Author's Checklist

  • [ ]

How to test this PR locally

Related issues

Screenshots

@efd6 efd6 added bugfix Pull request that fixes a bug issue Team:Security-Service Integrations Security Service Integrations team [elastic/security-service-integrations] Integration:ti_threatconnect ThreatConnect (Partner supported) labels Mar 21, 2025
@efd6 efd6 self-assigned this Mar 21, 2025
@efd6 efd6 force-pushed the ti_threatconnect_no_last_time branch from dce9602 to e2c7c5b Compare March 21, 2025 05:10
@elastic-vault-github-plugin-prod
Copy link

elastic-vault-github-plugin-prod bot commented Mar 21, 2025

🚀 Benchmarks report

To see the full report comment with /test benchmark fullreport

Comment on lines +130 to +141
"first_timestamp": (
!(has(body.data) && has(state.?cursor.first_timestamp)) ?
// We don't have any data or a first_timestamp. Limit to look-back.
string(now - duration(state.initial_interval))
: (has(body.next) && body.next != null && body.next != "") ?
// want_more is true, so limit to first timestamp.
state.cursor.first_timestamp
:
// We have data, but want_more is false, limit to last available
// timestamp falling back to look-back.
state.?cursor.last_timestamp.orValue(string(now - duration(state.initial_interval)))
),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block is an algebraic reorganisation to allow chained conditionals, rather than the nested organisation in the previous version. It also has the .orValue(…) that fixes the bug.

I can't help but feel that this is more complex than it needs to be, and I recall that this is something that I felt previously, but I have not simplified in case there is something that I'm missing in understanding the subtleties of the input.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mohitjha-elastic I know we discussed this in the past, but when I look at this with the new structure, I can't help but think that this could be simpler, can you explain again why we need the three state conditional that we have here?

Copy link
Contributor

@mohitjha-elastic mohitjha-elastic Mar 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@efd6
The first_timestamp is used to handle paginated calls.

In the initial PR, the logic had three conditions because cursor.last_timestamp has not orValue implemented and it was not set during the first call. The three conditions that you tweaked here was necessary in the initial PR and here are the elaborations of the same:

first_timestamp would be set on the below basis:

  1. No data or first call ( when first_timestamp is not set) → Set to (now - initial_interval).
  2. Data exists and more pages are available → Set to cursor.first_timestamp.
  3. Data exists but no more pages → Set to cursor.last_timestamp.

With the latest changes, cursor.last_timestamp will have orValue((now - duration(state.initial_interval)).format(time_layout.RFC3339)) hence it now hold (now - initial_interval) from the very first call. This allows us to simplify the condition as follows:

If data exists and more pages are available → Set first_timestamp to cursor.first_timestamp.
Otherwise → Set first_timestamp to cursor.last_timestamp.orValue(now - 24)

This change makes the logic more streamlined while maintaining its correctness. Let me know your thoughts on this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks

@efd6 efd6 marked this pull request as ready for review March 21, 2025 05:45
@efd6 efd6 requested a review from a team as a code owner March 21, 2025 05:45
@elasticmachine
Copy link

Pinging @elastic/security-service-integrations (Team:Security-Service Integrations)

@mohitjha-elastic
Copy link
Contributor

LGTM!

Copy link
Contributor

@chrisberkhout chrisberkhout left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to pass two timestamps forward. We're using state.want_more to choose between timestamps to build tql, but then we ignore that if state.want_more is true.

The results are always sorted lastModified asc, so I think it can be simplified to:

Request
if state.next_url is present, use it
else build a request using cursor.last_timestamp if present
else build a request using the initial interval

Response
set cursor.last_timestamp to the max timestamp seen, if any
set cursor.next_url if a next URL is available, other clear it

One extra bit: doing it as above means the initial interval would keep moving forward until some data is returned. That's probably okay, but if we want to lock in the starting point and repeat from there until we get data, we would just set it in cursor.last_timestamp if there's no existing cursor.last_timestamp or returned data.

@efd6
Copy link
Contributor Author

efd6 commented Mar 25, 2025

@chrisberkhout For the bug fix, I would prefer the minimal change. I agree that the logic is likely more complicated than is needed, but I think improving that should be an enhancement; when the enhancement includes the text "that's probably okay", I get iffy when I know that the bug fix will work.

Copy link
Contributor

@chrisberkhout chrisberkhout left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chrisberkhout For the bug fix, I would prefer the minimal change. I agree that the logic is likely more complicated than is needed, but I think improving that should be an enhancement; when the enhancement includes the text "that's probably okay", I get iffy when I know that the bug fix will work.

Even avoiding the "probably" part, my point is that the complexity that generated the bug can be removed.

If the current PR fixes it, I don't want to hold it up, so this review is "Comment" rather than "Request changes".

Also when I tried the simplification there was a bit more to it than I thought. So merging as is is still a good option.


I found it hard to be immediately confident in the change when it's still doing unnecessary things. The fix can add less and delete more.

My preference is to go straight to a simplification, that:

  • Uses the provided next URL instead of rebuilding it
  • Builds a request URL in one place
  • Uses state.initial_interval once
  • Propagates one timestamp

I had credentials handy and the version below works with 2 pages of data then polling at subsequent intervals. It will also keep its place during restarts.

The system test fails because the next value isn't a working URL. I guess there's one or two ways to fix that.

diff --git a/packages/ti_threatconnect/data_stream/indicator/agent/stream/cel.yml.hbs b/packages/ti_threatconnect/data_stream/indicator/agent/stream/cel.yml.hbs
index 1eb0bd305d..e909164c45 100644
--- a/packages/ti_threatconnect/data_stream/indicator/agent/stream/cel.yml.hbs
+++ b/packages/ti_threatconnect/data_stream/indicator/agent/stream/cel.yml.hbs
@@ -20,147 +20,113 @@ fields:
   _conf:
     ioc_expiration_duration: "{{ioc_expiration_duration}}"
 {{/if}}
 
 # This specifies SSL/TLS configuration. If the ssl section is missing, the host’s CAs are used for HTTPS connections.
 # If set to a boolean, this controls if SSL verification is enabled or not.
 {{#if ssl}}
 resource.ssl: {{ssl}}
 {{/if}}
 
 # Timeout for contacting TC.
 {{#if http_client_timeout}}
 resource.timeout: {{http_client_timeout}}
 {{/if}}
 
 # Set the base url.
 resource.url: {{url}}
 
 state:
   access_id: {{escape_string access_id}}
   secret_key: {{escape_string secret_key}}
-  counter: 0
   want_more: false
   batch: {{batch_size}}
   initial_interval: {{initial_interval}}
   event_list:
 {{#if include_group_assoc}}
     - associatedGroups
 {{/if}}
 {{#if include_group_assoc_attribs}}
     - associatedGroups.attributes
 {{/if}}
 {{#if include_indicator_assoc}}
     - associatedIndicators
 {{/if}}
 {{#if include_attributes}}
     - attributes
 {{/if}}
     - securityLabels
     - sightings
     - tags
     - threatAssess
   tql_filter: {{tql}}
 
 # Hide the secret_key.
 redact:
   fields:
     - secret_key
 
 # The program section is where the logic of the stream processor is defined.
 # Notice the format for the last timestamp does not include milliseconds. The default format included
 # milliseconds and if it ended in 0, that 0 would be dropped and TC TQL then would error on the timestamp.
 program: |
-  ['lastModified GEQ "'+(
-      !state.want_more ?
-          state.?cursor.last_timestamp.orValue((now - duration(state.initial_interval)).format(time_layout.RFC3339))
-      :
-          state.?cursor.first_timestamp.orValue("")
-      )+'"'+(
-          state.?tql_filter.orValue("") != "" ?
-              " AND "+state.tql_filter.trim(" ")
-          :
-              ""
-      )
-  ].as(tql,
+  state.?cursor.last_timestamp.orValue(
+    (now - duration(state.initial_interval)).format(time_layout.RFC3339)
+  ).as(start,
       request("GET",
-          state.want_more ?
-              state.next_url
-          :
+          state.?cursor.next_url.orValue(
               state.url.trim_right("/") + "/api/v3/indicators?" + {
                   "fields": state.event_list,
                   "resultStart": ["0"],
                   "resultLimit": [string(state.batch)],
                   "sorting": ["lastModified asc"],
-                  "tql": tql,
+                  "tql": ['lastModified GEQ "'+start+'"' + state.?tql_filter.optMap(f, " AND "+f.trim(" ")).orValue("")],
               }.format_query()
+          )
       ).as(req, req.URL.parse_url().as(url, req.with({
           "Header": {
               "Authorization": ["TC "+ string(state.access_id) + ":" +
                   bytes(url.Path + (url.RawQuery == "" ? "" : "?") + url.RawQuery + ":" + req.Method + ":" + string(int(now))).hmac("sha256", bytes(state.secret_key)).base64()
               ],
               "Timestamp": [string(int(now))],
           }
       }))).do_request().as(resp, resp.StatusCode == 200 ?
           bytes(resp.Body).decode_json().as(body, {
               "events": body.data.map(e, {
                   "message": e.encode_json(),
               }),
-              "url": state.url.trim_right("/"),
-              "counter": has(body.next) && body.next != null && body.next != "" ? int(state.counter) + int(state.batch) : 0,
+              "url": state.url,
               "access_id": state.access_id,
               "secret_key": state.secret_key,
-              "want_more": has(body.next) && body.next != null && body.next != "",
+              "want_more": has(body.next),
               "batch": state.batch,
               "initial_interval": state.initial_interval,
               "event_list": state.event_list,
               ?"tql_filter": state.?tql_filter,
-              "next_url": (
-                  has(body.next) && body.next != null && body.next != "" ?
-                      state.url.trim_right("/") + "/api/v3/indicators?" + {
-                          "fields": state.event_list,
-                          "resultStart": [string(int(state.counter) + body.data.size())],
-                          "resultLimit": [string(state.batch)],
-                          "sorting": ["lastModified asc"],
-                          "tql": tql,
-                      }.format_query()
-                  :
-                      state.url.trim_right("/")
-              ),
               "cursor": {
-                  ?"last_timestamp": (
+                  ?"next_url": body.?next,
+                  "last_timestamp": (
                       has(body.data) && body.data.size() > 0 ?
-                          optional.of(body.data.map(e, timestamp(e.lastModified)).max() + duration("1s"))
-                      :
-                          state.?cursor.last_timestamp
-                  ),
-                  "first_timestamp": (
-                      has(body.data) && state.?cursor.first_timestamp.orValue(null) != null ?
-                          (
-                              has(body.next) && body.next != null && body.next != "" ?
-                                  state.cursor.first_timestamp
-                              :
-                                  state.cursor.last_timestamp
-                          )
+                          body.data.map(e, timestamp(e.lastModified)).max() + duration("1s")
                       :
-                          string(now - duration(state.initial_interval))
+                          start
                   ),
               }
           })
       :
           state.with({
               "events": {
                   "error": {
                       "code": string(resp.StatusCode),
                       "id": string(resp.Status),
                       "message": "GET:"+(
                           size(resp.Body) != 0 ?
                               string(resp.Body)
                           :
                               string(resp.Status) + ' (' + string(resp.StatusCode) + ')'
                       ),
                   },
               },
               "want_more": false,
           })
       )
   )

It is possible to get into a state where the program expects there to be
a last_timestamp in the cursor but none exists due to previous data
being present but empty. Fix this by falling back to the look-back time
if the last_timestamp is missing.

Also reorganise the code to make the logic less opaque.
@efd6 efd6 force-pushed the ti_threatconnect_no_last_time branch from e2c7c5b to 5c821fb Compare March 27, 2025 20:48
@efd6
Copy link
Contributor Author

efd6 commented Mar 27, 2025

@chrisberkhout Please confirm whether the changes to dest.fleet_transform_version and dest.index are required.

And if you are happy for the minimal fix to be merged, can you dismiss your request for changes?

@elasticmachine
Copy link

💚 Build Succeeded

History

cc @efd6

@chrisberkhout chrisberkhout dismissed their stale review March 27, 2025 21:42

Larger-scope changes discussed in following review as optional.

@chrisberkhout
Copy link
Contributor

@chrisberkhout Please confirm whether the changes to dest.fleet_transform_version and dest.index are required.

I think they are, unfortunately. Not because we actually want to rebuild the destination index, but just because the name of the compatibility pipeline changes and therefore the transform definition also changes.

I guess every new package version will now rebuild the destination index and leave behind a stale version of it.

And if you are happy for the minimal fix to be merged, can you dismiss your request for changes?

👍 Thought my last review was going to override it. It's dismissed now.

@efd6 efd6 merged commit ca62d96 into elastic:main Mar 27, 2025
7 checks passed
@efd6
Copy link
Contributor Author

efd6 commented Mar 27, 2025

Follow-up: #13336

@elastic-vault-github-plugin-prod

Package ti_threatconnect - 1.9.3 containing this change is available at https://epr.elastic.co/package/ti_threatconnect/1.9.3/

flexitrev pushed a commit that referenced this pull request Mar 28, 2025
It is possible to get into a state where the program expects there to be
a last_timestamp in the cursor but none exists due to previous data
being present but empty. Fix this by falling back to the look-back time
if the last_timestamp is missing.

Also reorganise the code to make the logic less opaque.
flexitrev pushed a commit that referenced this pull request Mar 28, 2025
It is possible to get into a state where the program expects there to be
a last_timestamp in the cursor but none exists due to previous data
being present but empty. Fix this by falling back to the look-back time
if the last_timestamp is missing.

Also reorganise the code to make the logic less opaque.
flexitrev pushed a commit that referenced this pull request Mar 28, 2025
It is possible to get into a state where the program expects there to be
a last_timestamp in the cursor but none exists due to previous data
being present but empty. Fix this by falling back to the look-back time
if the last_timestamp is missing.

Also reorganise the code to make the logic less opaque.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bugfix Pull request that fixes a bug issue Integration:ti_threatconnect ThreatConnect (Partner supported) Team:Security-Service Integrations Security Service Integrations team [elastic/security-service-integrations]
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants