Skip to content

Commit

Permalink
feat: move Peer to PeerId (#2246)
Browse files Browse the repository at this point in the history
* feat: move Peer to PeerId

* up tests

* update tests
  • Loading branch information
weboko authored Feb 5, 2025
1 parent 2a7f4b6 commit fc93fae
Show file tree
Hide file tree
Showing 15 changed files with 135 additions and 143 deletions.
6 changes: 3 additions & 3 deletions packages/core/src/lib/base_protocol.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Libp2p } from "@libp2p/interface";
import type { Peer, Stream } from "@libp2p/interface";
import type { PeerId, Stream } from "@libp2p/interface";
import type {
IBaseProtocolCore,
Libp2pComponents,
Expand Down Expand Up @@ -38,7 +38,7 @@ export class BaseProtocol implements IBaseProtocolCore {
);
}

protected async getStream(peer: Peer): Promise<Stream> {
return this.streamManager.getStream(peer);
protected async getStream(peerId: PeerId): Promise<Stream> {
return this.streamManager.getStream(peerId);
}
}
50 changes: 25 additions & 25 deletions packages/core/src/lib/filter/filter.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Peer, Stream } from "@libp2p/interface";
import type { PeerId, Stream } from "@libp2p/interface";
import type { IncomingStreamData } from "@libp2p/interface-internal";
import {
type ContentTopic,
Expand Down Expand Up @@ -53,10 +53,10 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {

public async subscribe(
pubsubTopic: PubsubTopic,
peer: Peer,
peerId: PeerId,
contentTopics: ContentTopic[]
): Promise<CoreProtocolResult> {
const stream = await this.getStream(peer);
const stream = await this.getStream(peerId);

const request = FilterSubscribeRpc.createSubscribeRequest(
pubsubTopic,
Expand All @@ -78,7 +78,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
success: null,
failure: {
error: ProtocolError.GENERIC_FAIL,
peerId: peer.id
peerId: peerId
}
};
}
Expand All @@ -93,36 +93,36 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
return {
failure: {
error: ProtocolError.REMOTE_PEER_REJECTED,
peerId: peer.id
peerId: peerId
},
success: null
};
}

return {
failure: null,
success: peer.id
success: peerId
};
}

public async unsubscribe(
pubsubTopic: PubsubTopic,
peer: Peer,
peerId: PeerId,
contentTopics: ContentTopic[]
): Promise<CoreProtocolResult> {
let stream: Stream | undefined;
try {
stream = await this.getStream(peer);
stream = await this.getStream(peerId);
} catch (error) {
log.error(
`Failed to get a stream for remote peer${peer.id.toString()}`,
`Failed to get a stream for remote peer${peerId.toString()}`,
error
);
return {
success: null,
failure: {
error: ProtocolError.NO_STREAM_AVAILABLE,
peerId: peer.id
peerId: peerId
}
};
}
Expand All @@ -140,22 +140,22 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
success: null,
failure: {
error: ProtocolError.GENERIC_FAIL,
peerId: peer.id
peerId: peerId
}
};
}

return {
success: peer.id,
success: peerId,
failure: null
};
}

public async unsubscribeAll(
pubsubTopic: PubsubTopic,
peer: Peer
peerId: PeerId
): Promise<CoreProtocolResult> {
const stream = await this.getStream(peer);
const stream = await this.getStream(peerId);

const request = FilterSubscribeRpc.createUnsubscribeAllRequest(pubsubTopic);

Expand All @@ -171,7 +171,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
return {
failure: {
error: ProtocolError.NO_RESPONSE,
peerId: peer.id
peerId: peerId
},
success: null
};
Expand All @@ -187,32 +187,32 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
return {
failure: {
error: ProtocolError.REMOTE_PEER_REJECTED,
peerId: peer.id
peerId: peerId
},
success: null
};
}

return {
failure: null,
success: peer.id
success: peerId
};
}

public async ping(peer: Peer): Promise<CoreProtocolResult> {
public async ping(peerId: PeerId): Promise<CoreProtocolResult> {
let stream: Stream | undefined;
try {
stream = await this.getStream(peer);
stream = await this.getStream(peerId);
} catch (error) {
log.error(
`Failed to get a stream for remote peer${peer.id.toString()}`,
`Failed to get a stream for remote peer${peerId.toString()}`,
error
);
return {
success: null,
failure: {
error: ProtocolError.NO_STREAM_AVAILABLE,
peerId: peer.id
peerId: peerId
}
};
}
Expand All @@ -234,7 +234,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
success: null,
failure: {
error: ProtocolError.GENERIC_FAIL,
peerId: peer.id
peerId: peerId
}
};
}
Expand All @@ -244,7 +244,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
success: null,
failure: {
error: ProtocolError.NO_RESPONSE,
peerId: peer.id
peerId: peerId
}
};
}
Expand All @@ -260,12 +260,12 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
success: null,
failure: {
error: ProtocolError.REMOTE_PEER_REJECTED,
peerId: peer.id
peerId: peerId
}
};
}
return {
success: peer.id,
success: peerId,
failure: null
};
}
Expand Down
23 changes: 11 additions & 12 deletions packages/core/src/lib/light_push/light_push.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Peer, Stream } from "@libp2p/interface";
import type { PeerId, Stream } from "@libp2p/interface";
import {
type CoreProtocolResult,
type IBaseProtocolCore,
Expand Down Expand Up @@ -76,11 +76,10 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
}
}

// TODO(weboko): use peer.id as parameter instead
public async send(
encoder: IEncoder,
message: IMessage,
peer: Peer
peerId: PeerId
): Promise<CoreProtocolResult> {
const { query, error: preparationError } = await this.preparePushMessage(
encoder,
Expand All @@ -92,21 +91,21 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
success: null,
failure: {
error: preparationError,
peerId: peer.id
peerId
}
};
}

let stream: Stream;
try {
stream = await this.getStream(peer);
stream = await this.getStream(peerId);
} catch (error) {
log.error("Failed to get stream", error);
return {
success: null,
failure: {
error: ProtocolError.NO_STREAM_AVAILABLE,
peerId: peer.id
peerId: peerId
}
};
}
Expand All @@ -126,7 +125,7 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
success: null,
failure: {
error: ProtocolError.GENERIC_FAIL,
peerId: peer.id
peerId: peerId
}
};
}
Expand All @@ -145,7 +144,7 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
success: null,
failure: {
error: ProtocolError.DECODE_FAILED,
peerId: peer.id
peerId: peerId
}
};
}
Expand All @@ -156,7 +155,7 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
success: null,
failure: {
error: ProtocolError.NO_RESPONSE,
peerId: peer.id
peerId: peerId
}
};
}
Expand All @@ -168,7 +167,7 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
success: null,
failure: {
error: rlnErrorCase,
peerId: peer.id
peerId: peerId
}
};
}
Expand All @@ -179,11 +178,11 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
success: null,
failure: {
error: ProtocolError.REMOTE_PEER_REJECTED,
peerId: peer.id
peerId: peerId
}
};
}

return { success: peer.id, failure: null };
return { success: peerId, failure: null };
}
}
2 changes: 1 addition & 1 deletion packages/core/src/lib/metadata/metadata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class Metadata extends BaseProtocol implements IMetadata {

let stream;
try {
stream = await this.getStream(peer);
stream = await this.getStream(peerId);
} catch (error) {
log.error("Failed to get stream", error);
return {
Expand Down
6 changes: 3 additions & 3 deletions packages/core/src/lib/store/store.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Peer } from "@libp2p/interface";
import type { PeerId } from "@libp2p/interface";
import {
IDecodedMessage,
IDecoder,
Expand Down Expand Up @@ -38,7 +38,7 @@ export class StoreCore extends BaseProtocol implements IStoreCore {
public async *queryPerPage<T extends IDecodedMessage>(
queryOpts: QueryRequestParams,
decoders: Map<string, IDecoder<T>>,
peer: Peer
peerId: PeerId
): AsyncGenerator<Promise<T | undefined>[]> {
if (
queryOpts.contentTopics.toString() !==
Expand All @@ -58,7 +58,7 @@ export class StoreCore extends BaseProtocol implements IStoreCore {

let stream;
try {
stream = await this.getStream(peer);
stream = await this.getStream(peerId);
} catch (e) {
log.error("Failed to get stream", e);
break;
Expand Down
10 changes: 5 additions & 5 deletions packages/core/src/lib/stream_manager/stream_manager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ describe("StreamManager", () => {

streamManager["getConnections"] = (_peerId: PeerId | undefined) => [con1];

const stream = await streamManager.getStream(mockPeer);
const stream = await streamManager.getStream(mockPeer.id);

expect(stream).not.to.be.undefined;
expect(stream?.id).to.be.eq("1");
Expand All @@ -48,7 +48,7 @@ describe("StreamManager", () => {

let error: Error | undefined;
try {
await streamManager.getStream(mockPeer);
await streamManager.getStream(mockPeer.id);
} catch (e) {
error = e as Error;
}
Expand Down Expand Up @@ -76,7 +76,7 @@ describe("StreamManager", () => {
con1.newStream = newStreamSpy;
streamManager["getConnections"] = (_peerId: PeerId | undefined) => [con1];

const stream = await streamManager.getStream(mockPeer);
const stream = await streamManager.getStream(mockPeer.id);

expect(stream).not.to.be.undefined;
expect(stream?.id).to.be.eq("2");
Expand All @@ -102,8 +102,8 @@ describe("StreamManager", () => {
streamManager["getConnections"] = (_peerId: PeerId | undefined) => [con1];

const [stream1, stream2] = await Promise.all([
streamManager.getStream(mockPeer),
streamManager.getStream(mockPeer)
streamManager.getStream(mockPeer.id),
streamManager.getStream(mockPeer.id)
]);

const expected = ["1", "2"].toString();
Expand Down
Loading

0 comments on commit fc93fae

Please sign in to comment.