-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Bug]: Version update plc4j-apache-kafka #125
Comments
You just need to checkout the github repo with the extras: https://github.com/apache/plc4x-extras/ and build that repo ... it should automatically build with the latest drivers. |
Hello @chrisdutz, I've built the folder (https://github.com/apache/plc4x-extras/tree/develop/plc4j/integrations/apache-kafka) using Maven ( When I try to use only When I try to use only And if I use both ( Do you know what might be causing these issues? According to the repo, I should only need the Is it possible to have the plugin in a folder like the ones found here: https://repo1.maven.org/maven2/org/apache/plc4x/plc4j-apache-kafka/0.12.0/? |
The "originial-*" probably is the code of the kafka adapter without any dependencies. The other is the only including the dependencies. Why it doesn't appear in Kafka, I currently have no idea ... I could possibly have a look on one of my next PLC4X friday afternoons ... however will be 2-3 weeks I guess before I have time for that. Hopefully someone else will be able to be of assistance. |
The plugin will appear at a location like that once it's released ... till then the only option you have is to get it from the SNAPSHOT repo: https://repository.apache.org/content/groups/snapshots/org/apache/plc4x/plc4j-apache-kafka/0.13.0-SNAPSHOT/ |
Hello @chrisdutz, Thanks for the quick response. I understand, I'll test it here and try to figure it out. Currently, I have managed to make it appear in Kafka by changing the command from However, if I may, I have one more question for you. I'm encountering the following error when running a source connector: Does this error look familiar to you, or do you have any idea what might be wrong? I tried adding other dependencies related to
Thank you in advance for your help! |
First of all ... all important plugin executions are bound to the package phase ... so switcing from intall to package should just skip the installation goal step, which should have no effect on what's in "target", but if it works, that's good ;-) The error you are getting comes from stopping the scraper without having successfully started it. This could be because it never was started, or there was an error starting it ... if you find any log output "Error starting the scraper" that should provide some more information on that. |
Hello @chrisdutz, I've tested everything with DEBUG mode enabled, but I still don't see any errors related to the scraper. It's always the same issue as mentioned before. Could the problem be related to the environment I'm using to build it? Currently, I am using Apache Maven 3.6.3 and Java version 17.0.12. In Kafka, I am using the .zip file produced in Is there a better way to improve logging in the PLC4x connector? Thank you! |
Hello @chrisdutz, I discovered something interesting. I'm currently running the connector code locally to test it, and the error remains the same. The issue is triggered when executing:
Somehow, this trigger is not functioning correctly, and it always produces the error: "Exception in thread 'main' java.lang.NoClassDefFoundError: com/fasterxml/jackson/dataformat/yaml/YAMLFactory." I suspect this might be related to the use of the Interestingly, if I remove all test scopes when running the code, the previous FasterXML error disappears (because jackson-dataformat is just in test scope), the scraping starts, it attempts to poll but then the buffer is empty, not pooling anything. (Obs. It is possible to see in Prosys that it connected with it and opened a channel, however it doesen't poll anything)
After some time, the scraper just stops:
Do you have any ideas on what might be causing this issue? :) Thanks a lot in advance. Best, |
Yould you please add which version of Kafka you are using? |
Hello @chrisdutz, I am using Confluent Platform 7.6.1 with Apache Kafka 3.6.x. However, I don't think that is the problem, because I took that into consideration when compiling the connector. And I am also experiencing the same issue even when using the connector without Kafka. Below is the code example where I am testing the connector outside of Kafka: package org.apache.plc4x.kafka;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.*;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.plc4x.kafka.Plc4xSourceConnector;
import org.apache.plc4x.kafka.Plc4xSourceTask;
import org.apache.plc4x.kafka.config.Constants;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ConnectorTest {
private static final Logger log = LoggerFactory.getLogger(ConnectorTest.class);
public static void main(String[] args) {
// Simulate Kafka Connect properties
Map<String, String> props = new HashMap<>();
props.put("tasks.max", "1");
props.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
props.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
props.put("default.topic", "DefaultTopic");
props.put("sources", "machineA");
props.put("sources.machineA.connectionString", "opcua:tcp://Yoshi.lab.mtu-digilab.io:53530/OPCUA/SimulationServer?discovery=false");
props.put("sources.machineA.pollReturnInterval", "500");
props.put("sources.machineA.bufferSize", "1000");
props.put("sources.machineA.jobReferences.simulated-dashboard.topic", "simulated-dashboard-topic");
props.put("jobs", "simulated-dashboard");
props.put("jobs.simulated-dashboard.interval", "1000");
props.put("jobs.simulated-dashboard.tags", "running");
props.put("jobs.simulated-dashboard.tags.running", "ns=3;i=1002");
props.put("sources.machineA.jobReferences", "simulated-dashboard");
log.debug("Starting PLC4X Kafka Connector with properties: {}", props);
// Instantiate your connector
Plc4xSourceConnector connector = new Plc4xSourceConnector();
connector.start(props);
// Get task configurations
List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
if (taskConfigs.isEmpty()) {
log.error("No task configurations found. Please check the configuration.");
return;
}
// Instantiate task
Plc4xSourceTask task = new Plc4xSourceTask();
System.out.println("task" + taskConfigs.get(0));
task.start(taskConfigs.get(0));
// Simulate polling
List<SourceRecord> records = task.poll();
if (records != null && !records.isEmpty()) {
for (SourceRecord record : records) {
System.out.println("Polled Record: " + record);
}
} else {
System.out.println("No records polled.");
}
// Stop task and connector
task.stop();
connector.stop();
}
} Thanks! I hope we can find where this problem comes from :) Best, |
Hello @chrisdutz, I would like to add the following point: I tried changing the versions of PLC4x to 0.12.0 in the
UPDATE: By adding all missing libraries, the connector does work with the release 0.12.0. However, it does not work with the 0.13.0-SNAPSHOT. Right now, iI guess that the issue might be related to the compilation or creation of the connector. It does not make any sense Do you have any suggestions on how to fix this? Or any prerequisite to compile the connector. Thanks again for all your support! |
Hello @chrisdutz, Sorry for continuing this thread... I've tracked down the problem, and it appears to be coming from the scraper when it tries to connect:
This occurs exactly at the part where it tries to establish the connection, and it doesn't work: public static PlcConnection getPlcConnection(PlcConnectionManager plcConnectionManager,
String connectionString,
ExecutorService executorService,
long requestTimeoutMs,
String info) throws InterruptedException, ExecutionException, TimeoutException {
if (!info.isEmpty() && LOGGER.isTraceEnabled()) {
LOGGER.trace("Additional Info from caller {}", info);
}
CompletableFuture<PlcConnection> future = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("Attempting to get a connection to: " + connectionString);
PlcConnection connection = plcConnectionManager.getConnection(connectionString);
System.out.println("Connection object created successfully: " + connection);
return connection;
} catch (Exception e) {
LOGGER.warn("Unable to instantiate connection to " + connectionString, e);
System.out.println("Unable to instantiate connection to: " + connectionString + ", exception: " + e);
e.printStackTrace(); // Print the full stack trace to stdout for debugging
throw new PlcRuntimeException(e);
}
}, executorService); It gets stuck on this specific line: PlcConnection connection = plcConnectionManager.getConnection(connectionString); The strange part is that with release 0.12.0, this error does not occur, and everything works fine. Another odd thing is that I can see in Prosys that the connection is opened, and it also seems to renew the channel, but it does not pull the data. At this point, I still don't know how to fix the issue. If you have any suggestions, I would greatly appreciate it :) All the best, |
Hello @chrisdutz, I’ve found some additional logs that may be useful, but I still haven’t identified the root cause of the problem. As you can see, a request is made and a channel is successfully created:
However, after a lot of logging, the connection is established but no data is received. Eventually, it times out:
Do you think this issue could be related to an update, or is there something else I might be overlooking? If you have any suggestions, just let me know :) Thanks! |
Hello @chrisdutz, I have some updates regarding the issue. I could find the problem and do a quick fix. In
I fixed it by hard-coding the request timeout, and it worked for about 5 minutes, but then it started creating sessions every 2 seconds (see the image below). This happens in both with and without security. Do you know why the values from the OPC UA driver don’t seem to pass to the scraper? I’d like to fix this for the community, but I’m still a bit lost with all the dependencies, which makes testing difficult. Also, do you have any idea what’s causing the session issue? I don’t see this problem when using OPC UA Java by itself, only when using it with the Kafka connector. I’d like to resolve this for the community too, but I’m still unsure where the issue lies. Thanks, and sorry for the continued messages, reply whenever you have time :) Best regards, |
Hey, @gustavoschewinski , my Kafka connector has the same issue when compiling like u did. Did u find any solutions? thx |
Hello @fernandokendy, unfortunately it's still not working. I found a fix for the timeout issue as said before, but now there's another session bug occurring. At the moment, I don't have many clues about where the problem might be |
Thanks for all of your work on the issue ... Unfortunately I'm not a pro using Kafka so I'm a bit hesitant to invest the 4h per week that I get to work on this ... perhaps there's someone on the community that has the expertiese to help? |
To update everyone, I recently got it working, but it's not the ideal solution :) I used version 0.12.0 from github to compile a working Kafka connector and implemented the necessary updates myself, mainly adding/changing some channel renewal features for OPCUA. The rest of the protocols seem to work fine with the "old" version. However, something is causing some issues when compiling with the 0.13.0-SNAPSHOT version, and I haven't been able to find a fix. I still make myself available to help fix the new version with some guidance, since I'm quite lost... |
head should compile again, although there are test failures related to NIFI |
I didn't express myself clearly earlier, sorry. The project compiles normally in the 0.13.0-SNAPSHOT, but the connector doesn't work because of a problem in "TriggeredScraperImpl" when running it. As I said before the |
maybe @splatch has an idea |
@gustavoschewinski can you push your changes and submit PR, so I can try building your changes and see how it fails? I think it will be most effective way. I believe that we should be able to get a snapshot version of driver within plc4x-extras, once we configure ASF snapshot repository. |
@splatch sorry for the delay, but it does compile. |
@gustavoschewinski no worries. I see three reports - first about signature/runtime linkage errors, second about timeout errors in scraper, third about re-connections. I am not entirely sure which we need to address in order to solve initial kafka connectivity. Maybe it will be best to address them one by one? |
Sounds good. I just built the latest PLC4X for Kafka and got it running using the package from
It’s weird because this issue didn’t happen with version 0.12.0. To fix it, I added this dependency: <dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
<version>2.17.2</version>
</dependency> After rebuilding and running Kafka again, I got another error:
This is the same error I saw earlier. |
Hi Gustavo, from the line throwing the error it seems that you are passing invalid data-structures to the scraper ... however from my perspective I can't really see which one is bad (or if both source and job-configurations are bad) ... I could add some code to possibly do some more checks and output an error message that is easier to understand. |
@chrisdutz, that’s possible, but I’m using the same configuration for the connector that works with versions v0.12.0 and v0.13.0-SNAPSHOT from the Maven repository. The errors only occur when rebuilding from the current HEAD branch and using the connector in Kafka. Is it possible that the connector configuration has changed? Obs: Ive already got the connector working with a updated v0.12.0 and everything I need, so there’s no rush. However, I’m happy to help in finding a solution for the community |
What happened?
Hello,
I have a question rather than a bug report.
Is it possible to compile plc4j-apache-kafka with newer versions of the OPC-UA and S7 drivers? If so, is there any documentation available on how to do this? I'm willing to compile it and make the updated version of the Kafka connector available for everyone to use.
The reason I ask is that I need to use version 0.13.0-SNAPSHOT for both drivers.
Thank you!
Best regards,
Gustavo.
Version
v.0.12.0
Programming Languages
Protocols
The text was updated successfully, but these errors were encountered: