Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using rxjs/webSocket on a server #5385

Open
kevmo314 opened this issue Apr 10, 2020 · 16 comments
Open

Using rxjs/webSocket on a server #5385

kevmo314 opened this issue Apr 10, 2020 · 16 comments

Comments

@kevmo314
Copy link

kevmo314 commented Apr 10, 2020

Feature Request

Right now WebSocketSubject only accepts a constructor. Can this be changed to also accept an existing socket in the config?

Specifically, my use case is that I'd like to use rxjs with websocket/ws on my server, and their API would allow me to do something like:

wss.on("connection", (ws: WebSocket) => {
  const rxjsWebSocket = webSocket(ws); // Wrap the node WebSocket into a subject.
});

wrt api compatibility, I think it's reasonable to assume the web socket remains w3c compatible instead of worrying about the NodeJS.WebSocket API. There are various compatibility shims to adapt the assorted NodeJS variants to w3c sockets. I'm curious if this was an intentional choice to not allow wrapping an existing socket or if this can be done?

@six-edge
Copy link

six-edge commented Jan 16, 2021

I'm working on making a transparent WebSocket layer for connecting a browser to a remote server (Raspberry Pi) to be able to subscribe directly to sensors and set outputs etc. from the browser or front-end application. So the idea is to have a WebSocket server that handles incoming connections and subscribes in a seamless manner to the GPIO ports.

This would be a welcomed feature!

@abarke
Copy link

abarke commented Jan 16, 2021

@kevmo314 Seems there is a way to do this already:

const WebSocket = require('ws')
const { webSocket } = require('rxjs/webSocket')

const webSocketConnection$ = (
  webSocket({
    protocol: 'v1',
    url: 'http://example.com:3000',
    WebSocketCtor: WebSocket,
  })
)

Ref: https://gist.github.com/Sawtaytoes/fe3d16b1a15aa20eef5d2a41d0b39934
See: https://rxjs.dev/api/webSocket/WebSocketSubjectConfig#WebSocketCtor

@kevmo314
Copy link
Author

@abarke This doesn't work for existing web sockets. I mentioned this in the first line of the FR.

@abarke
Copy link

abarke commented Jan 19, 2021

@kevmo314 apologies I see your point. You are right. An already established WebSocket would allow a new WebSocketSubject to be created for each incoming WS connection. The WebSocketSubject would then be used to send and receive messages on the server-side. I suppose one could then subscribe each WebSocketSubject to a shared resource ('hardware input, data stream, logs, etc.') e.g. ResourceSubject that then emits updated values and multicasting it to all subscribers (WebSocket Clients).

I created a demo to simulate a shared resource on a server just to wrap my head around it, as I need something similar for a project: https://stackblitz.com/edit/rxjs-shared-source-example?file=index.ts

It might be simple enough to modify the WebSocketSubject to take a WebSocket as an argument. I might just give it a hack 🙂

@six-edge
Copy link

Hey @kevmo314 I managed to create a PoC and published the library on NPM for consumption. I rewrote the WebSocketSubject class as extending it proved difficult due to not being able to set private members of the WebSocketSubject. Works great in my project so far. Any feedback/contributions welcome.

https://www.npmjs.com/package/rxjs-websocket-server-subject

@six-edge
Copy link

six-edge commented Mar 8, 2021

Any feedback on this @kevmo314 ?

Im looking at opening a PR for this, however I'm not sure if I should merge the WebSocketServerSubject into the WebSocketSubject (could get messy) or leave it as its own class (some duplicate code, but has separation of concerns and is more maintainable IMHO)

@benlesh any ideas or feedback of how to proceed?

@kevmo314
Copy link
Author

kevmo314 commented Mar 8, 2021

Unfortunately my use case has come and gone, the code that needed this is now happily running and stable. I took a look at the package though, it looks pretty good and I'd love to see this PR'd into the main rxjs/webSocket package. One edge case I'm curious about, what happens if there are two subjects on a single socket and one closes due to backpressure? What happens to the other subject?

@six-edge
Copy link

six-edge commented Apr 6, 2021

@kevmo314 regarding two subjects on a single socket... do you mean something like this?

wsServer.on('connection', (webSocket: WebSocket, req: IncomingMessage) => {
  const wsServerSubject = new WebSocketServerSubject<Message>(webSocket)
  const wsServerSubject2 = new WebSocketServerSubject<Message>(webSocket)

I tried this but it seems only wsServerSubject2 is bound. Not sure what your specific use case is, however in my test project I just create a single wsServerSubject and pipe various listeners to that subject that filter a specific message for each message type emitted. Could you elaborate on your use case as to why having two subjects on a single socket would be useful?

@ValentinBossi
Copy link

what about using fromEvent? From then on its possible to do everything right?

const wss = new Server({ server });

const handleConnection = (websocket: WebSocket): Observable<WebSocket.MessageEvent | WebSocket.CloseEvent> => {
  const onmessage$ = fromEvent<WebSocket.MessageEvent>(websocket, 'message')
    .pipe(
      tap(m => console.log(m.type))
    );
  const onclose$ = fromEvent<WebSocket.CloseEvent>(websocket, 'close')
    .pipe(
      tap(m => console.log(m.reason)));
      // const onerror$ = ....
      // etc
  return merge(onmessage$, onclose$)
}

fromEvent(wss, 'connection')
  .pipe(
    map(a => a[0] as WebSocket),
    mergeMap(handleConnection)
  ).subscribe();
}

@noririco
Copy link

what about using fromEvent? From then on its possible to do everything right?

const wss = new Server({ server });

const handleConnection = (websocket: WebSocket): Observable<WebSocket.MessageEvent | WebSocket.CloseEvent> => {
  const onmessage$ = fromEvent<WebSocket.MessageEvent>(websocket, 'message')
    .pipe(
      tap(m => console.log(m.type))
    );
  const onclose$ = fromEvent<WebSocket.CloseEvent>(websocket, 'close')
    .pipe(
      tap(m => console.log(m.reason)));
      // const onerror$ = ....
      // etc
  return merge(onmessage$, onclose$)
}

fromEvent(wss, 'connection')
  .pipe(
    map(a => a[0] as WebSocket),
    mergeMap(handleConnection)
  ).subscribe();
}

This is very nice idea, did you try it ?
once you go rxjs you need it to the vain

@ValentinBossi
Copy link

what about using fromEvent? From then on its possible to do everything right?

const wss = new Server({ server });

const handleConnection = (websocket: WebSocket): Observable<WebSocket.MessageEvent | WebSocket.CloseEvent> => {
  const onmessage$ = fromEvent<WebSocket.MessageEvent>(websocket, 'message')
    .pipe(
      tap(m => console.log(m.type))
    );
  const onclose$ = fromEvent<WebSocket.CloseEvent>(websocket, 'close')
    .pipe(
      tap(m => console.log(m.reason)));
      // const onerror$ = ....
      // etc
  return merge(onmessage$, onclose$)
}

fromEvent(wss, 'connection')
  .pipe(
    map(a => a[0] as WebSocket),
    mergeMap(handleConnection)
  ).subscribe();
}

This is very nice idea, did you try it ?
once you go rxjs you need it to the vain

sure. in the moment one message type is looking like that:

const onConvertMessage$ = fromEvent(websocket, 'message')
        .pipe(
            map(m => WsExchange.fromMessage(JSON.parse((m as WsMessage).data))),
            filter(m => m.action === "convert"),
            map(m => m as ConvertText),
            switchMap(handleConvertion),
            mergeMap(uploadToAws),
            map(bucket => GeneratedAudio.from(bucket)),
            tap(message => websocket.send(JSON.stringify(message))),
            catchError((err, caught) => {
                console.log("error in convert pipeline: ", err)
                return caught;
            }),
        );

@bever1337
Copy link

bever1337 commented Sep 30, 2022

I don't think this makes sense as part of rxjs because the change is to support a userland socket implementation that only works in a node environment. It would be a large commitment for RxJS development to follow ws development. For example, what versions should RxJS pin as a peer dependency? Who will test changes? ws is my preferred tooling, and this package looks like a great implementation, so why not continue to use this as a package?

If it is included, it should not be part of the dom observable module because the browser runtime is not supported.

@kevmo314
Copy link
Author

It would be a large commitment for RxJS development to follow ws development.

This is not true, as mentioned in the original issue the request is to follow the w3c specification for which the ws websocket is compliant. The change is to support passing in a spec-compliant websocket which also exists in browsers, not a specifically nodejs runtime websocket.

@bever1337
Copy link

bever1337 commented Oct 1, 2022

The client is spec-compliant in that it can talk to other spec-compliant sockets, absolutely, but the API cannot be used in the browser. For example, the node websocket allows the user to implement ping/pong, whereas browser websockets handle ping/pong automatically. Does RxJS need to expose additional streams on the subject? IMO yes, if the goal is to fully support native sockets.

If the goal is only to pass in a web socket, why not create a dynamic constructor as-needed? Someone suggested this earlier, but without creating a constructor on-the-fly. We could leverage variable scope, for example:

const nativeSocket = /* native `ws` created socket reference */;
const WebsocketCtor = function LocalCtor(url: string, protocol: string[]) {
  this = nativeSocket;
};
const rxjsSocket = new WebSocketSubject({
  WebSocketCtor,
});

In this situation, RxJS won't attach listeners until the developer subscribes to the new subject. It would be possible for the socket to emit events that are lost and de-sync RxJS state. For example, the openObserver might not get called. I need to review your changes better to understand how retry logic works. For example: By passing in a socket instead of a config, how would RxJS know the correct origin or protocol for the new socket?

@bever1337
Copy link

I'm writing an additional comment instead of more edits 😂
@kevmo314 is the intention that your package could be merged right over the WebSocketSubject code? How can we review this as a PR? I'd love to get a proper diff so I can better understand your idea.

@lropero
Copy link

lropero commented Aug 13, 2024

what about using fromEvent? From then on its possible to do everything right?

const wss = new Server({ server });

const handleConnection = (websocket: WebSocket): Observable<WebSocket.MessageEvent | WebSocket.CloseEvent> => {
  const onmessage$ = fromEvent<WebSocket.MessageEvent>(websocket, 'message')
    .pipe(
      tap(m => console.log(m.type))
    );
  const onclose$ = fromEvent<WebSocket.CloseEvent>(websocket, 'close')
    .pipe(
      tap(m => console.log(m.reason)));
      // const onerror$ = ....
      // etc
  return merge(onmessage$, onclose$)
}

fromEvent(wss, 'connection')
  .pipe(
    map(a => a[0] as WebSocket),
    mergeMap(handleConnection)
  ).subscribe();
}

Hi, I'm trying to access message's data in handleConnection's observables:

import express from 'express'
import { createServer } from 'http'
import { fromEvent, merge } from 'rxjs'
import { map, mergeMap, tap } from 'rxjs/operators'
import { WebSocketServer } from 'ws'

const app = express()
const server = createServer(app)
const wss = new WebSocketServer({ server })

const handleConnection = socket => {
  const onClose$ = fromEvent(socket, 'close').pipe(tap(value => console.log('close', value)))
  const onMessage$ = fromEvent(socket, 'message').pipe(tap(value => console.log('message', value)))
  return merge(onClose$, onMessage$)
}

fromEvent(wss, 'connection')
  .pipe(
    map(value => value[0]),
    mergeMap(handleConnection)
  )
  .subscribe()

...but instead I'm getting ws's event target formatted messages, like so:

close CloseEvent {
  [Symbol(kTarget)]: <ref *1> WebSocket {
    _events: [Object: null prototype] {
      close: [Array],
      error: [Function],
      message: [Function],
      open: [Function]
    },
    _eventsCount: 4,
    _maxListeners: undefined,
    _binaryType: 'nodebuffer',
    _closeCode: 1005,
    _closeFrameReceived: true,
    _closeFrameSent: true,
    _closeMessage: <Buffer >,
    _closeTimer: Timeout {
      _idleTimeout: -1,
      _idlePrev: null,
      _idleNext: null,
      _idleStart: 2493,
      _onTimeout: null,
      _timerArgs: undefined,
      _repeat: null,
      _destroyed: true,
      [Symbol(refed)]: true,
      [Symbol(kHasPrimitive)]: false,
      [Symbol(asyncId)]: 43,
      [Symbol(triggerId)]: 35
    },
    _errorEmitted: false,
    _extensions: {},
    _paused: false,
    _protocol: '',
    _readyState: 3,
    _receiver: Receiver {
      _events: [Object: null prototype] {},
      _writableState: [WritableState],
      _maxListeners: undefined,
      _allowSynchronousEvents: true,
      _binaryType: 'nodebuffer',
      _extensions: {},
      _isServer: true,
      _maxPayload: 104857600,
      _skipUTF8Validation: false,
      _bufferedBytes: 0,
      _buffers: [],
      _compressed: false,
      _payloadLength: 0,
      _mask: <Buffer b0 dc 14 ca>,
      _fragmented: 0,
      _masked: true,
      _fin: true,
      _opcode: 8,
      _totalPayloadLength: 0,
      _messageLength: 0,
      _fragments: [],
      _errored: false,
      _loop: false,
      _state: 0,
      _eventsCount: 0,
      [Symbol(shapeMode)]: false,
      [Symbol(kCapture)]: false,
      [Symbol(websocket)]: [Circular *1]
    },
    _sender: Sender {
      _extensions: {},
      _socket: [Socket],
      _firstFragment: true,
      _compress: false,
      _bufferedBytes: 0,
      _queue: [],
      _state: 0,
      onerror: [Function: senderOnError],
      [Symbol(websocket)]: [Circular *1]
    },
    _socket: Socket {
      connecting: false,
      _hadError: false,
      _parent: null,
      _host: null,
      _closeAfterHandlingError: false,
      _events: [Object],
      _readableState: [ReadableState],
      _writableState: [WritableState],
      allowHalfOpen: true,
      _maxListeners: undefined,
      _eventsCount: 2,
      _sockname: null,
      _pendingData: null,
      _pendingEncoding: '',
      server: [Server],
      _server: [Server],
      parser: null,
      on: [Function (anonymous)],
      addListener: [Function (anonymous)],
      prependListener: [Function: prependListener],
      setEncoding: [Function: socketSetEncoding],
      _paused: false,
      timeout: 0,
      [Symbol(async_id_symbol)]: 35,
      [Symbol(kHandle)]: null,
      [Symbol(lastWriteQueueSize)]: 0,
      [Symbol(timeout)]: null,
      [Symbol(kBuffer)]: null,
      [Symbol(kBufferCb)]: null,
      [Symbol(kBufferGen)]: null,
      [Symbol(shapeMode)]: true,
      [Symbol(kCapture)]: false,
      [Symbol(kSetNoDelay)]: true,
      [Symbol(kSetKeepAlive)]: false,
      [Symbol(kSetKeepAliveInitialDelay)]: 0,
      [Symbol(kBytesRead)]: 229,
      [Symbol(kBytesWritten)]: 131,
      [Symbol(websocket)]: undefined
    },
    _autoPong: true,
    _isServer: true,
    [Symbol(shapeMode)]: false,
    [Symbol(kCapture)]: false
  },
  [Symbol(kType)]: 'close',
  [Symbol(kCode)]: 1005,
  [Symbol(kReason)]: '',
  [Symbol(kWasClean)]: true
}

How can I fix this?
Thanks.-

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants