Skip to content

Commit f8a49af

Browse files
committed
Implement updated pubsub API
1 parent 69e4383 commit f8a49af

File tree

7 files changed

+27
-12
lines changed

7 files changed

+27
-12
lines changed

lib/cid.jar

-1 Bytes
Binary file not shown.

lib/multiaddr.jar

28 Bytes
Binary file not shown.

lib/multibase.jar

867 Bytes
Binary file not shown.

lib/multihash.jar

-1 Bytes
Binary file not shown.

pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
3535
<version.junit>4.13.2</version.junit>
3636
<version.hamcrest>2.2</version.hamcrest>
37-
<version.multiaddr>v1.4.3</version.multiaddr>
37+
<version.multiaddr>v1.4.6</version.multiaddr>
3838
</properties>
3939

4040
<repositories>

src/main/java/io/ipfs/api/IPFS.java

+23-7
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.ipfs.api;
22

33
import io.ipfs.cid.*;
4+
import io.ipfs.multibase.*;
45
import io.ipfs.multihash.Multihash;
56
import io.ipfs.multiaddr.MultiAddress;
67

@@ -276,16 +277,24 @@ public Object peers(String topic) throws IOException {
276277
* @return
277278
* @throws IOException
278279
*/
279-
public Object pub(String topic, String data) throws Exception {
280-
return retrieveAndParse("pubsub/pub?arg="+topic + "&arg=" + data);
280+
public void pub(String topic, String data) {
281+
String encodedTopic = Multibase.encode(Multibase.Base.Base64Url, topic.getBytes());
282+
Multipart m = new Multipart(protocol +"://" + host + ":" + port + version+"pubsub/pub?arg=" + encodedTopic, "UTF-8");
283+
try {
284+
m.addFilePart("file", Paths.get(""), new NamedStreamable.ByteArrayWrapper(data.getBytes()));
285+
String res = m.finish();
286+
} catch (IOException e) {
287+
throw new RuntimeException(e.getMessage(), e);
288+
}
281289
}
282290

283291
public Stream<Map<String, Object>> sub(String topic) throws Exception {
284292
return sub(topic, ForkJoinPool.commonPool());
285293
}
286294

287295
public Stream<Map<String, Object>> sub(String topic, ForkJoinPool threadSupplier) throws Exception {
288-
return retrieveAndParseStream("pubsub/sub?arg=" + topic, threadSupplier).map(obj -> (Map)obj);
296+
String encodedTopic = Multibase.encode(Multibase.Base.Base64Url, topic.getBytes());
297+
return retrieveAndParseStream("pubsub/sub?arg=" + encodedTopic, threadSupplier).map(obj -> (Map)obj);
289298
}
290299

291300
/**
@@ -295,10 +304,9 @@ public Stream<Map<String, Object>> sub(String topic, ForkJoinPool threadSupplier
295304
* @throws IOException
296305
*/
297306
public void sub(String topic, Consumer<Map<String, Object>> results, Consumer<IOException> error) throws IOException {
298-
retrieveAndParseStream("pubsub/sub?arg="+topic, res -> results.accept((Map)res), error);
307+
String encodedTopic = Multibase.encode(Multibase.Base.Base64Url, topic.getBytes());
308+
retrieveAndParseStream("pubsub/sub?arg="+encodedTopic, res -> results.accept((Map)res), error);
299309
}
300-
301-
302310
}
303311

304312
/* 'ipfs block' is a plumbing command used to manipulate raw ipfs blocks.
@@ -778,7 +786,15 @@ private InputStream retrieveStream(String path) throws IOException {
778786

779787
private static InputStream getStream(URL target, int connectTimeoutMillis, int readTimeoutMillis) throws IOException {
780788
HttpURLConnection conn = configureConnection(target, "POST", connectTimeoutMillis, readTimeoutMillis);
781-
return conn.getInputStream();
789+
try {
790+
return conn.getInputStream();
791+
} catch (IOException e) {
792+
e.printStackTrace();
793+
InputStream errorStream = conn.getErrorStream();
794+
String err = errorStream == null ? e.getMessage() : new String(readFully(errorStream));
795+
List<String> trailer = conn.getHeaderFields().get("Trailer");
796+
throw new RuntimeException("IOException contacting IPFS daemon.\n"+err+"\nTrailer: " + trailer, e);
797+
}
782798
}
783799

784800
private Map postMap(String path, byte[] body, Map<String, String> headers) throws IOException {

src/test/java/io/ipfs/api/APITest.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import java.io.*;
1010
import java.nio.file.*;
1111
import java.util.*;
12-
import java.util.concurrent.*;
1312
import java.util.function.*;
1413
import java.util.stream.*;
1514

@@ -408,7 +407,7 @@ public void publish() throws Exception {
408407
}
409408

410409
@Test
411-
public void pubsubSynchronous() throws Exception {
410+
public void pubsubSynchronous() {
412411
String topic = "topic" + System.nanoTime();
413412
List<Map<String, Object>> res = Collections.synchronizedList(new ArrayList<>());
414413
new Thread(() -> {
@@ -433,8 +432,8 @@ public void pubsub() throws Exception {
433432
String topic = "topic" + System.nanoTime();
434433
Stream<Map<String, Object>> sub = ipfs.pubsub.sub(topic);
435434
String data = "Hello!";
436-
Object pub = ipfs.pubsub.pub(topic, data);
437-
Object pub2 = ipfs.pubsub.pub(topic, "G'day");
435+
ipfs.pubsub.pub(topic, data);
436+
ipfs.pubsub.pub(topic, "G'day");
438437
List<Map> results = sub.limit(2).collect(Collectors.toList());
439438
Assert.assertTrue( ! results.get(0).equals(Collections.emptyMap()));
440439
}

0 commit comments

Comments
 (0)