Skip to content

Commit

Permalink
Merge pull request #85 from rdf-connect/v0.1.1
Browse files Browse the repository at this point in the history
v0.1.1
  • Loading branch information
julianrojas87 authored Feb 17, 2025
2 parents 34f2274 + 343f86e commit 550bbeb
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 126 deletions.
127 changes: 9 additions & 118 deletions lib/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,20 @@ import { FileStateFactory, NoStateFactory, StateFactory } from "./state";
import { CBDShapeExtractor } from "extract-cbd-shape";
import { RdfStore } from "rdf-stores";
import { DataFactory } from "rdf-data-factory";
import { Writer as NWriter } from "n3";
import { Quad_Object, Term } from "@rdfjs/types";
import { Term } from "@rdfjs/types";
import {
enhanced_fetch,
extractMainNodeShape,
getObjects,
handleConditions,
maybeVersionMaterialize,
ModulatorFactory,
Notifier,
processConditionFile,
streamToArray,
} from "./utils";
import { LDES, SDS, TREE } from "@treecg/types";
import { LDES, TREE } from "@treecg/types";
import { FetchedPage, Fetcher, longPromise, resetPromise } from "./pageFetcher";
import { Manager } from "./memberManager";
import { OrderedStrategy, StrategyEvents, UnorderedStrategy } from "./strategy";
import type { Writer } from "@rdfc/js-runner";
import { getLoggerFor } from "./utils/logUtil";

export { intoConfig } from "./config";
Expand Down Expand Up @@ -279,6 +275,10 @@ export class Client {
}
}

// This is the ID of the stream of data we are replicating.
// Normally it corresponds to the actual LDES IRI, unless externally specified.
this.streamId = this.streamId || viewQuads[0].subject;

const info = await getInfo(
ldesId,
viewId,
Expand Down Expand Up @@ -311,12 +311,10 @@ export class Client {
)
: undefined;

this.streamId = this.streamId || viewQuads[0].subject;

this.memberManager = new Manager(
isLocalDump
? null // Local dump does not have a stream id
: this.streamId || viewQuads[0].subject,
? null // Local dump does not need to dereference a view
: viewQuads[0].subject, // Point to the actual LDES IRI
state.item,
info,
);
Expand Down Expand Up @@ -503,111 +501,4 @@ async function fetchPage(
data.import(resp.data).on("end", resolve).on("error", reject);
});
return <FetchedPage>{ url, data };
}

export async function processor(
writer: Writer<string>,
url: string,
before?: Date,
after?: Date,
ordered?: string,
follow?: boolean,
pollInterval?: number,
shape?: string,
noShape?: boolean,
save?: string,
loose?: boolean,
urlIsView?: boolean,
fetch_config?: {
auth?: {
type: "basic";
auth: string;
host: string;
};
concurrent?: number;
retry?: {
codes: number[];
maxRetries: number;
};
},
condition?: string,
materialize?: boolean,
lastVersionOnly?: boolean,
) {
const logger = getLoggerFor("processor");

if (fetch_config?.auth) {
fetch_config.auth.host = new URL(url).host;
}
const client = replicateLDES(
{
loose,
noShape,
shapeFile: shape,
polling: follow,
url: url,
stateFile: save,
pollInterval: pollInterval,
urlIsView,
after,
before,
fetch: fetch_config ? enhanced_fetch(fetch_config) : fetch,
materialize,
lastVersionOnly,
condition: await processConditionFile(condition),
},
<Ordered>ordered || "none",
);

client.on("fragment", () => logger.verbose("Fragment!"));

const reader = client.stream({ highWaterMark: 10 }).getReader();

writer.on("end", async () => {
await reader.cancel();
logger.info("Writer closed, so closing reader as well.");
});

return async () => {
let el = await reader.read();
const seen = new Set();
while (el) {
if (el.value) {
seen.add(el.value.id);

if (seen.size % 100 == 1) {
logger.verbose(
`Got member ${seen.size} with ${el.value.quads.length} quads`,
);
}

const blank = df.blankNode();
const quads = el.value.quads.slice();
quads.push(
df.quad(
blank,
SDS.terms.stream,
<Quad_Object>client.streamId!,
SDS.terms.custom("DataDescription"),
),
df.quad(
blank,
SDS.terms.payload,
<Quad_Object>el.value.id!,
SDS.terms.custom("DataDescription"),
),
);

await writer.push(new NWriter().quadsToString(quads));
}

if (el.done) {
break;
}

el = await reader.read();
}

logger.verbose(`Found ${seen.size} members`);
};
}
}
123 changes: 123 additions & 0 deletions lib/rdfc-processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import { getLoggerFor } from "./utils/logUtil";
import { replicateLDES } from "./client";
import { enhanced_fetch, processConditionFile } from "./utils";
import { DataFactory } from "rdf-data-factory";
import { SDS } from "@treecg/types";
import { Writer as NWriter } from "n3";

import type { Writer } from "@rdfc/js-runner";
import type { Quad_Object } from "@rdfjs/types";
import type { Ordered } from "./client";

const df = new DataFactory();

export async function processor(
writer: Writer<string>,
url: string,
before?: Date,
after?: Date,
ordered?: string,
follow?: boolean,
pollInterval?: number,
shape?: string,
noShape?: boolean,
save?: string,
loose?: boolean,
urlIsView?: boolean,
fetch_config?: {
auth?: {
type: "basic";
auth: string;
host: string;
};
concurrent?: number;
retry?: {
codes: number[];
maxRetries: number;
};
},
condition?: string,
materialize?: boolean,
lastVersionOnly?: boolean,
streamId?: string,
) {
const logger = getLoggerFor("processor");

if (fetch_config?.auth) {
fetch_config.auth.host = new URL(url).host;
}

const client = replicateLDES(
{
loose,
noShape,
shapeFile: shape,
polling: follow,
url: url,
stateFile: save,
pollInterval: pollInterval,
urlIsView,
after,
before,
fetch: fetch_config ? enhanced_fetch(fetch_config) : fetch,
materialize,
lastVersionOnly,
condition: await processConditionFile(condition),
},
<Ordered>ordered || "none",
undefined,
streamId ? df.namedNode(streamId) : undefined,
);

client.on("fragment", () => logger.verbose("Fragment!"));

const reader = client.stream({ highWaterMark: 10 }).getReader();

writer.on("end", async () => {
await reader.cancel();
logger.info("Writer closed, so closing reader as well.");
});

return async () => {
let el = await reader.read();
const seen = new Set();
while (el) {
if (el.value) {
seen.add(el.value.id);

if (seen.size % 100 == 1) {
logger.verbose(
`Got member ${seen.size} with ${el.value.quads.length} quads`,
);
}

const blank = df.blankNode();
const quads = el.value.quads.slice();
quads.push(
df.quad(
blank,
SDS.terms.stream,
<Quad_Object>client.streamId!,
SDS.terms.custom("DataDescription"),
),
df.quad(
blank,
SDS.terms.payload,
<Quad_Object>el.value.id!,
SDS.terms.custom("DataDescription"),
),
);

await writer.push(new NWriter().quadsToString(quads));
}

if (el.done) {
break;
}

el = await reader.read();
}

logger.verbose(`Found ${seen.size} members`);
};
}
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "ldes-client",
"description": "This package provides common tooling to work with LDESes.",
"version": "0.1.0",
"version": "0.1.1",
"main": "dist/lib/client.js",
"exports": {
"import": "./dist/lib/client.js",
Expand Down
11 changes: 10 additions & 1 deletion processor.ttl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
js:LdesClient a js:JsProcess;
dc:title "ldes client";
dc:description "";
js:file <./dist/lib/client.js>;
js:file <./dist/lib/rdfc-processor.js>;
js:function "processor";
js:location <./>;
js:mapping [
Expand Down Expand Up @@ -79,6 +79,10 @@ js:LdesClient a js:JsProcess;
a fnom:PositionParameterMapping;
fnom:functionParameter "lastVersionOnly";
fnom:implementationParameterPosition "15"^^xsd:int;
], [
a fnom:PositionParameterMapping;
fnom:functionParameter "streamId";
fnom:implementationParameterPosition "16"^^xsd:int;
];
].

Expand Down Expand Up @@ -218,5 +222,10 @@ js:LdesClient a js:JsProcess;
sh:path js:lastVersionOnly;
sh:name "lastVersionOnly";
sh:maxCount 1;
], [
sh:datatype xsd:string;
sh:path js:streamId;
sh:name "streamId";
sh:maxCount 1;
].

3 changes: 2 additions & 1 deletion tests/rdf-connect/ldes-client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import { fastifyStatic } from "@fastify/static";
import { Parser } from "n3";
import { RdfStore } from "rdf-stores";
import { DataFactory } from "rdf-data-factory";
import { processor, replicateLDES } from "../../lib/client";
import { replicateLDES } from "../../lib/client";
import { processor } from "../../lib/rdfc-processor";
import { createUriAndTermNamespace, RDF, SDS, DC } from "@treecg/types";
import { Stream } from "stream";

Expand Down
9 changes: 6 additions & 3 deletions tests/rdf-connect/processor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ describe("Tests for js:LdesClient processor", async () => {
];
js:conditionFile </path/to/condition.ttl>;
js:materialize true;
js:lastVersionOnly true.
js:lastVersionOnly true;
js:streamId "MyStream".
`;

const source: Source = {
Expand All @@ -72,7 +73,7 @@ describe("Tests for js:LdesClient processor", async () => {

const argss = extractSteps(env, quads, config);
expect(argss.length).toBe(1);
expect(argss[0].length).toBe(16);
expect(argss[0].length).toBe(17);

const [
[
Expand All @@ -91,7 +92,8 @@ describe("Tests for js:LdesClient processor", async () => {
fetch_config,
conditionFile,
materialize,
lastVersionOnly
lastVersionOnly,
streamId
],
] = argss;

Expand Down Expand Up @@ -122,6 +124,7 @@ describe("Tests for js:LdesClient processor", async () => {
expect(conditionFile).toBe("/path/to/condition.ttl");
expect(materialize).toBeTruthy();
expect(lastVersionOnly).toBeTruthy();
expect(streamId).toBe("MyStream");

await checkProc(env.file, env.func);
});
Expand Down

0 comments on commit 550bbeb

Please sign in to comment.