Skip to content

Commit

Permalink
Add porting files of Kafka v0.45.0 (#400)
Browse files Browse the repository at this point in the history
* Add porting files of Kafka v0.45.0

Signed-off-by: Tyler Gu <[email protected]>

Add Kakfa operator porting context

Signed-off-by: Tyler Gu <[email protected]>

* Fix parsing the OneOf schemas

Signed-off-by: Tyler Gu <[email protected]>

* Fix test generator of OneOf schema

Signed-off-by: Tyler Gu <[email protected]>

* Simplify Kafka's authentication schema

Signed-off-by: Tyler Gu <[email protected]>

* Prepare for Kafka config test

Signed-off-by: Tyler Gu <[email protected]>

* Add Kafka config-test config

Signed-off-by: Tyler Gu <[email protected]>

* Fix Kafka config to use the config module

Signed-off-by: Tyler Gu <[email protected]>

* Fix Kafka broker config schema

Signed-off-by: Tyler Gu <[email protected]>

* Fix Kafka config module

Signed-off-by: Tyler Gu <[email protected]>

* Add Kafka log parser

Signed-off-by: Tyler Gu <[email protected]>

* Fix Kafka broker config

Signed-off-by: Tyler Gu <[email protected]>

* Update Kafka fault injection config

Signed-off-by: Tyler Gu <[email protected]>

* Ignore last state of containers

Signed-off-by: Tyler Gu <[email protected]>

* add oracle for kafka

* Update Kafka configuration testing config

* Fix oracle comparison issue

* Fix value extraction bug in oracle

* Fix the negetive value extraction bug

---------

Signed-off-by: Tyler Gu <[email protected]>
Co-authored-by: TZ-zzz <[email protected]>
  • Loading branch information
tylergu and TZ-zzz authored Feb 21, 2025
1 parent 572fc01 commit d8e4944
Show file tree
Hide file tree
Showing 17 changed files with 39,715 additions and 104 deletions.
2 changes: 1 addition & 1 deletion acto/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,7 @@ def __init__(

self.sequence_base = 0

self.custom_oracle: Optional[type[CheckerInterface]] = None
self.custom_checker: Optional[type[CheckerInterface]] = None
self.custom_on_init: Optional[Callable] = None
if operator_config.custom_oracle is not None:
module = importlib.import_module(operator_config.custom_oracle)
Expand Down
20 changes: 20 additions & 0 deletions acto/input/test_generators/primitive.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
OpaqueSchema,
StringSchema,
)
from acto.schema.oneof import OneOfSchema
from acto.utils.thread_logger import get_thread_logger


Expand Down Expand Up @@ -62,6 +63,25 @@ def any_of_tests(schema: AnyOfSchema):
return ret


@test_generator(property_type="OneOf", priority=Priority.PRIMITIVE)
def one_of_tests(schema: OneOfSchema):
"""Generate testcases for AnyOf type"""

ret: list[TestCase] = []
if schema.enum is not None:
for case in schema.enum:
ret.append(EnumTestCase(case, primitive=True))
else:
for sub_schema in schema.possibilities:
testcases = resolve_testcases(sub_schema)
for testcase in testcases:
testcase.add_precondition(
SchemaPrecondition(sub_schema).precondition
)
ret.extend(testcases)
return ret


@test_generator(property_type="Array", priority=Priority.PRIMITIVE)
def array_tests(schema: ArraySchema):
"""Representation of an array node
Expand Down
223 changes: 130 additions & 93 deletions acto/parse_log/parse_log.py
Original file line number Diff line number Diff line change
@@ -1,61 +1,73 @@
import json
import logging
import re

from acto.common import get_thread_logger

klog_regex = r'^\s*'
klog_regex += r'(\w)' # group 1: level
KLOG_REGEX = r"^\s*"
KLOG_REGEX += r"(\w)" # group 1: level
# group 2-7: timestamp
klog_regex += r'(\d{2})(\d{2})\s(\d{2}):(\d{2}):(\d{2})\.(\d{6})'
klog_regex += r'\s+'
klog_regex += r'(\d+)' # group 8
klog_regex += r'\s'
klog_regex += r'(.+):' # group 9: filename
klog_regex += r'(\d+)' # group 10: lineno
klog_regex += r'\]\s'
klog_regex += r'(.*?)' # group 11: message
klog_regex += r'\s*$'

logr_regex = r'^\s*'
KLOG_REGEX += r"(\d{2})(\d{2})\s(\d{2}):(\d{2}):(\d{2})\.(\d{6})"
KLOG_REGEX += r"\s+"
KLOG_REGEX += r"(\d+)" # group 8
KLOG_REGEX += r"\s"
KLOG_REGEX += r"(.+):" # group 9: filename
KLOG_REGEX += r"(\d+)" # group 10: lineno
KLOG_REGEX += r"\]\s"
KLOG_REGEX += r"(.*?)" # group 11: message
KLOG_REGEX += r"\s*$"

LOGR_REGEX = r"^\s*"
# group 1: timestamp
logr_regex += r'(\d{4}\-\d{2}\-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z)'
logr_regex += r'\s+([A-Z]+)' # group 2: level
logr_regex += r'\s+(\S+)' # group 3: source
logr_regex += r'\s+(.*?)' # group 4: message
logr_regex += r'\s*$'
LOGR_REGEX += r"(\d{4}\-\d{2}\-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z)"
LOGR_REGEX += r"\s+([A-Z]+)" # group 2: level
LOGR_REGEX += r"\s+(\S+)" # group 3: source
LOGR_REGEX += r"\s+(.*?)" # group 4: message
LOGR_REGEX += r"\s*$"

# 1.6599427639039357e+09 INFO controllers.CassandraDatacenter Reconcile loop completed {"cassandradatacenter": "cass-operator/test-cluster", "requestNamespace": "cass-operator", "requestName": "test-cluster", "loopID": "be419d0c-c7d0-4dfa-8596-af94ea15d4f6", "duration": 0.253729569}
logr_special_regex = r'^\s*'
logr_special_regex += r'(\d{1}\.\d+e\+\d{2})' # group 1: timestamp
logr_special_regex += r'\s+([A-Z]+)' # group 2: level
logr_special_regex += r'\s+(\S+)' # group 3: source
logr_special_regex += r'\s+(.*?)' # group 4: message
logr_special_regex += r'\s*$'
LOGR_SPECIAL_REGEX = r"^\s*"
LOGR_SPECIAL_REGEX += r"(\d{1}\.\d+e\+\d{2})" # group 1: timestamp
LOGR_SPECIAL_REGEX += r"\s+([A-Z]+)" # group 2: level
LOGR_SPECIAL_REGEX += r"\s+(\S+)" # group 3: source
LOGR_SPECIAL_REGEX += r"\s+(.*?)" # group 4: message
LOGR_SPECIAL_REGEX += r"\s*$"

# time="2022-08-08T03:21:28Z" level=debug msg="Sentinel is not monitoring the correct master, changing..." src="checker.go:175"
# time="2022-08-08T03:21:56Z" level=info msg="deployment updated" deployment=rfs-test-cluster namespace=acto-namespace service=k8s.deployment src="deployment.go:102"
logrus_regex = r'^\s*'
LOGRUS_REGEX = r"^\s*"
# group 1: timestamp
logrus_regex += r'time="(\d{4}\-\d{2}\-\d{2}T\d{2}:\d{2}:\d{2}Z)"'
logrus_regex += r'\s+level=([a-z]+)' # group 2: level
logrus_regex += r'\s+msg="(.*?[^\\])"' # group 3: message
logrus_regex += r'.*'
logrus_regex += r'\s+(src="(.*?)")?' # group 4: src
logrus_regex += r'\s*$'
LOGRUS_REGEX += r'time="(\d{4}\-\d{2}\-\d{2}T\d{2}:\d{2}:\d{2}Z)"'
LOGRUS_REGEX += r"\s+level=([a-z]+)" # group 2: level
LOGRUS_REGEX += r'\s+msg="(.*?[^\\])"' # group 3: message
LOGRUS_REGEX += r".*"
LOGRUS_REGEX += r'\s+(src="(.*?)")?' # group 4: src
LOGRUS_REGEX += r"\s*$"
# this is semi-auto generated by copilot, holy moly

# This one is similar to logr_special_regex and logrus_regex, but with some differences
# This one is similar to LOGR_SPECIAL_REGEX and LOGRUS_REGEX, but with some differences
# 2024-03-05T10:07:17Z ERROR GrafanaReconciler reconciler error in stage {"controller": "grafana", "controllerGroup": "grafana.integreatly.org", "controllerKind": "Grafana", "Grafana": {"name":"test-cluster","namespace":"grafana"}, "namespace": "grafana", "name": "test-cluster", "reconcileID": "5aa39e3e-d5d3-47fc-848d-c3d15dfbcc3d", "stage": "deployment", "error": "Deployment.apps \"test-cluster-deployment\" is invalid: [spec.template.spec.containers[0].image: Required value, spec.template.spec.affinity.podAntiAffinity.requiredDuringSchedulingIgnoredDuringExecution[0].topologyKey: Required value: can not be empty, spec.template.spec.affinity.podAntiAffinity.requiredDuringSchedulingIgnoredDuringExecution[0].topologyKey: Invalid value: \"\": name part must be non-empty, spec.template.spec.affinity.podAntiAffinity.requiredDuringSchedulingIgnoredDuringExecution[0].topologyKey: Invalid value: \"\": name part must consist of alphanumeric characters, '-', '_' or '.', and must start and end with an alphanumeric character (e.g. 'MyName', or 'my.name', or '123-abc', regex used for validation is '([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9]')]"}
grafana_logr_regex = r'^\s*'
grafana_logr_regex += r'(\d{4}\-\d{2}\-\d{2}T\d{2}:\d{2}:\d{2}Z)' # Group 1: timestamp
grafana_logr_regex += r'\s+([A-Z]+)' # Group 2: level
grafana_logr_regex += r'\s+(\S+)' # Group 3: Source
grafana_logr_regex += r'\s+(.*?)' # Group 4: Message
grafana_logr_regex += r'\s*$' # Take up any remaining whitespace
GRAFANA_LOGR_REGEX = r"^\s*"
GRAFANA_LOGR_REGEX += (
r"(\d{4}\-\d{2}\-\d{2}T\d{2}:\d{2}:\d{2}Z)" # Group 1: timestamp
)
GRAFANA_LOGR_REGEX += r"\s+([A-Z]+)" # Group 2: level
GRAFANA_LOGR_REGEX += r"\s+(\S+)" # Group 3: Source
GRAFANA_LOGR_REGEX += r"\s+(.*?)" # Group 4: Message
GRAFANA_LOGR_REGEX += r"\s*$" # Take up any remaining whitespace

# Kafka log format
# 2025-01-24 22:52:03 WARN AbstractConfiguration:93 - Reconciliation #27(watch) Kafka(acto-namespace/test-cluster): Configuration option "process.roles" is forbidden and will be ignored
KAFKA_LOG_REGEX = r"^\s*"
KAFKA_LOG_REGEX += (
r"(\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2})\s+" # Group 1: timestamp
)
KAFKA_LOG_REGEX += r"(INFO|WARN|ERROR|DEBUG)\s+" # Group 2: level
KAFKA_LOG_REGEX += r"(\S+):(\d+)\s+-" # Group 3: Source
KAFKA_LOG_REGEX += r"\s+(.*?)$" # Group 4: Message


def parse_log(line: str) -> dict:
'''Try to parse the log line with some predefined format
"""Try to parse the log line with some predefined format
Currently only support three formats:
- klog
Expand All @@ -65,72 +77,97 @@ def parse_log(line: str) -> dict:
Returns:
a dict containing 'level' and 'message'
'level' will always be a lowercase string
'''
"""
logger = get_thread_logger(with_prefix=True)

log_line = {}
if re.search(klog_regex, line) != None:
if re.search(KLOG_REGEX, line) is not None:
# log is in klog format
match = re.search(klog_regex, line)
if match.group(1) == 'E':
log_line['level'] = 'error'
elif match.group(1) == 'I':
log_line['level'] = 'info'
elif match.group(1) == 'W':
log_line['level'] = 'warn'
elif match.group(1) == 'F':
log_line['level'] = 'fatal'

log_line['msg'] = match.group(11)
elif re.search(logr_regex, line) != None:
match = re.search(KLOG_REGEX, line)
if match is None:
logger.debug("parse_log() cannot parse line %s", line)
return {}
if match.group(1) == "E":
log_line["level"] = "error"
elif match.group(1) == "I":
log_line["level"] = "info"
elif match.group(1) == "W":
log_line["level"] = "warn"
elif match.group(1) == "F":
log_line["level"] = "fatal"

log_line["msg"] = match.group(11)
elif re.search(LOGR_REGEX, line) is not None:
# log is in logr format
match = re.search(logr_regex, line)
log_line['level'] = match.group(2).lower()
log_line['msg'] = match.group(4)
elif re.search(logr_special_regex, line) != None:
match = re.search(LOGR_REGEX, line)
if match is None:
logger.debug("parse_log() cannot parse line %s", line)
return {}
log_line["level"] = match.group(2).lower()
log_line["msg"] = match.group(4)
elif re.search(LOGR_SPECIAL_REGEX, line) is not None:
# log is in logr special format
match = re.search(logr_special_regex, line)
log_line['level'] = match.group(2).lower()
log_line['msg'] = match.group(4)
elif re.search(logrus_regex, line) != None:
match = re.search(LOGR_SPECIAL_REGEX, line)
if match is None:
logger.debug("parse_log() cannot parse line %s", line)
return {}
log_line["level"] = match.group(2).lower()
log_line["msg"] = match.group(4)
elif re.search(LOGRUS_REGEX, line) is not None:
# log is in logrus format
match = re.search(logrus_regex, line)
log_line['level'] = match.group(2)
log_line['msg'] = match.group(3)
elif re.search(grafana_logr_regex, line) != None:
match = re.search(grafana_logr_regex, line)
log_line['level'] = match.group(2).lower()
log_line['msg'] = match.group(4)
match = re.search(LOGRUS_REGEX, line)
if match is None:
logger.debug("parse_log() cannot parse line %s", line)
return {}
log_line["level"] = match.group(2)
log_line["msg"] = match.group(3)
elif re.search(GRAFANA_LOGR_REGEX, line) is not None:
match = re.search(GRAFANA_LOGR_REGEX, line)
if match is None:
logger.debug("parse_log() cannot parse line %s", line)
return {}
log_line["level"] = match.group(2).lower()
log_line["msg"] = match.group(4)
elif re.search(KAFKA_LOG_REGEX, line) is not None:
match = re.search(KAFKA_LOG_REGEX, line)
if match is None:
logger.debug("parse_log() cannot parse line %s", line)
return {}
log_line["level"] = match.group(2).lower()
log_line["msg"] = match.group(5)
else:
try:
log_line = json.loads(line)
if 'level' not in log_line:
log_line['level'] = log_line['severity']
if "level" not in log_line:
log_line["level"] = log_line["severity"]

del log_line['severity']
log_line['level'] = log_line['level'].lower()
del log_line["severity"]
log_line["level"] = log_line["level"].lower()
except Exception as e:
logger.debug(f"parse_log() cannot parse line {line} due to {e}")
pass
logger.debug("parse_log() cannot parse line %s due to %s", line, e)

return log_line


if __name__ == '__main__':
# line = '   Ports: []v1.ServicePort{'
# line = 'E0714 23:11:19.386396 1 pd_failover.go:70] PD failover replicas (0) reaches the limit (0), skip failover'
# line = '{"level":"error","ts":1655678404.9488907,"logger":"controller-runtime.injectors-warning","msg":"Injectors are deprecated, and will be removed in v0.10.x"}'

# line = 'time="2022-08-08T03:21:56Z" level=info msg="deployment updated" deployment=rfs-test-cluster namespace=acto-namespace service=k8s.deployment src="deployment.go:102"'
# print(logrus_regex)
# print(parse_log(line)['msg'])

with open('testrun-2022-08-10-15-59/trial-01-0000/operator-0.log', 'r') as f:
for line in f.readlines():
print(f"Parsing log: {line}")

if parse_log(line) == {} or parse_log(line)['level'].lower() != 'error' and parse_log(line)['level'].lower() != 'fatal':
print(f'Test passed: {line} {parse_log(line)}')
else:
print(f"Message Raw: {line}, Parsed {parse_log(line)}")
break
# if __name__ == "__main__":
# line = '   Ports: []v1.ServicePort{'
# line = 'E0714 23:11:19.386396 1 pd_failover.go:70] PD failover replicas (0) reaches the limit (0), skip failover'
# line = '{"level":"error","ts":1655678404.9488907,"logger":"controller-runtime.injectors-warning","msg":"Injectors are deprecated, and will be removed in v0.10.x"}'

# line = 'time="2022-08-08T03:21:56Z" level=info msg="deployment updated" deployment=rfs-test-cluster namespace=acto-namespace service=k8s.deployment src="deployment.go:102"'
# print(LOGRUS_REGEX)
# print(parse_log(line)['msg'])

# with open("testrun-kafka-config/trial-00-0020/operator-002.log", "r") as f:
# for line in f.readlines():
# print(f"Parsing log: {line}")

# if (
# parse_log(line) == {}
# or parse_log(line)["level"].lower() != "error"
# and parse_log(line)["level"].lower() != "fatal"
# ):
# print(f"Test passed: {line} {parse_log(line)}")
# else:
# print(f"Message Raw: {line}, Parsed {parse_log(line)}")
# break
1 change: 1 addition & 0 deletions acto/post_process/post_diff_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ def compare_system_equality(
r".*\['last_update_time'\]",
r".*\['image_id'\]",
r".*\['restart_count'\]",
r".*\['status'\]\['container_statuses'\]\[.*\]\['last_state'\]",
]

if additional_exclude_paths is not None:
Expand Down
48 changes: 44 additions & 4 deletions acto/schema/oneof.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
from copy import deepcopy
from typing import List, Tuple

from .array import ArraySchema
from .base import BaseSchema, TreeNode
from .object import ObjectSchema


class OneOfSchema(BaseSchema):
Expand All @@ -18,10 +20,20 @@ def __init__(self, path: list, schema: dict) -> None:
for index, v in enumerate(schema["oneOf"]):
base_schema = deepcopy(schema)
del base_schema["oneOf"]
base_schema.update(v)
self.possibilities.append(
extract_schema(self.path + [str(index)], base_schema)
)
self.__recursive_update(base_schema, v)
self.possibilities.append(extract_schema(self.path, base_schema))

def __recursive_update(self, left: dict, right: dict):
"""Recursively update left dict with right dict"""
for key, value in right.items():
if (
key in left
and isinstance(left[key], dict)
and isinstance(value, dict)
):
self.__recursive_update(left[key], value)
else:
left[key] = value

def get_possibilities(self):
"""Return all possibilities of the anyOf schema"""
Expand Down Expand Up @@ -70,3 +82,31 @@ def __str__(self) -> str:
ret += ", "
ret += "]"
return ret

def __getitem__(self, key):
if isinstance(key, int):
for i in self.possibilities:
if isinstance(i, ArraySchema):
return i[key]
raise RuntimeError("No array schema found in oneOf")
if isinstance(key, str):
for i in self.possibilities:
if isinstance(i, ObjectSchema):
return i[key]
raise RuntimeError("No object schema found in oneOf")
raise TypeError("Key must be either int or str")

def __setitem__(self, key, value):
if isinstance(key, int):
for i in self.possibilities:
if isinstance(i, ArraySchema):
i[key] = value
return
raise RuntimeError("No array schema found in oneOf")
if isinstance(key, str):
for i in self.possibilities:
if isinstance(i, ObjectSchema):
i[key] = value
return
raise RuntimeError("No object schema found in oneOf")
raise TypeError("Key must be either int or str")
Loading

0 comments on commit d8e4944

Please sign in to comment.