Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server sent events changed in bun v1.1.27 and 1.1.26 #13811

Open
simylein opened this issue Sep 8, 2024 · 14 comments · May be fixed by #13841
Open

server sent events changed in bun v1.1.27 and 1.1.26 #13811

simylein opened this issue Sep 8, 2024 · 14 comments · May be fixed by #13841
Assignees
Labels
bug Something isn't working

Comments

@simylein
Copy link
Contributor

simylein commented Sep 8, 2024

What version of Bun is running?

1.1.27+267afa293

What platform is your computer?

Darwin 23.6.0 arm64 arm

What steps can reproduce the bug?

I am using serve to stream server sent events over http.
In bun v1.1.25 (and before) everything works as expected.
Starting in 1.1.26 I get disconnected after 8 seconds.
However, in 1.1.27 it completely breaks and takes down my Linux server.

I am using the following code to pub sub a readable stream.

import EventEmitter from 'events';
import { customHeaders, defaultHeaders } from '../core/response/response';
import { debug, trace } from '../logger/logger';

export const emitter = new EventEmitter();
emitter.setMaxListeners(2048);

export const subscribe = (req: Request, channel: string, event?: string, data?: unknown): Response => {
	return new Response(
		new ReadableStream({
			type: 'direct',
			pull(controller: ReadableStreamDirectController) {
				let id = +(req.headers.get('last-event-id') ?? 0);
				const handler = async (ev: string, dat: unknown): Promise<void> => {
					await controller.write(`id:${id}\nevent:${ev}\ndata:${dat !== undefined ? JSON.stringify(dat) : ''}\n\n`);
					await controller.flush();
					id++;
				};
				debug(`subscribing to ${channel}`);
				emitter.on(channel, handler);
				req.signal.onabort = () => {
					debug(`unsubscribing from ${channel}`);
					emitter.off(channel, handler);
				};
				void handler(event ?? 'connect', data);
				return new Promise(() => void 0);
			},
		}),
		{
			status: 200,
			headers: { ...defaultHeaders, ...customHeaders, 'content-type': 'text/event-stream;charset=utf-8' },
		},
	);
};

export const emit = (channel: string, event: string, data?: unknown): void => {
	trace(`emitting to ${channel}`);
	emitter.emit(channel, event, data);
};

Have there been breaking changes or is this new behaviour intentional?
If so, can someone advise how server sent events are supposed to be used now?

What is the expected behavior?

bun v1.1.25 behaviour

[simylein] (dev) 14:28:48 req: get /api/metrics/sse from ::1
[simylein] (dev) 14:28:48 res: 200 took 1.09 ms 0 b
[simylein] (dev) 14:28:48 debug: subscribing to metrics

What do you see instead?

bun v1.1.26 behaviour

[simylein] (dev) 14:22:26 req: get /api/metrics/sse from ::1
[simylein] (dev) 14:22:26 res: 200 took 1.79 ms 0 b
[simylein] (dev) 14:22:26 debug: subscribing to metrics
[simylein] (dev) 14:22:34 debug: unsubscribing from metrics

bun v1.1.27 behaviour

[simylein] (dev) 14:22:26 req: get /api/metrics/sse from ::1
[simylein] (dev) 14:26:03 debug: subscribing to metrics
[simylein] (dev) 14:26:03 debug: subscribing to metrics
------- a lot of lines collapsed ------
[simylein] (dev) 14:26:03 debug: subscribing to metrics
[simylein] (dev) 14:26:03 debug: subscribing to metrics
warn: Possible EventEmitter memory leak detected. 2049 metrics listeners added to [EventEmitter2]. Use emitter.setMaxListeners() to increase limit
      at overflowWarning (node:events:32:25)
      at addListener (node:events:268:57)
      at pull (/Users/simylein/Private/simylein/lib/event/event.ts:20:13)

Additional information

The code is a section of fluxify (https://github.com/simylein/fluxify) and it's server sent events handling.

@devunt
Copy link

devunt commented Sep 9, 2024

Having a similar problem. SSE response got disconnected after roughly 10 seconds after the connection has been established. In v1.1.26 it works without an issue, but after upgrading to v1.1.27 SSE connection (response type text/event-stream) keeps get dropped after 10 seconds of inactivity. That being said, If I send a heartbeat event each second the connection does not get dropped, so it only happens in SSE connection with no event being pushed.

@Jarred-Sumner
Copy link
Collaborator

Having a similar problem. SSE response got disconnected after roughly 10 seconds after the connection has been established. In v1.1.26 it works without an issue, but after upgrading to v1.1.27 SSE connection (response type text/event-stream) keeps get dropped after 10 seconds of inactivity. That being said, If I send a heartbeat event each second the connection does not get dropped, so it only happens in SSE connection with no event being pushed.

Try setting idleTimeout: 0 in Bun.serve().

@cirospaciari maybe we should disable idle timeout when you start streaming

@cirospaciari
Copy link
Collaborator

cirospaciari commented Sep 9, 2024

@simylein

Testing with the code:

import EventEmitter, { once } from "node:events";
import net from "node:net";
const debug = console.debug;
const trace = console.trace;

const emitter = new EventEmitter();
emitter.setMaxListeners(0);
const subscribe = (req, channel, event, data) => {
  return new Response(
    new ReadableStream({
      type: "direct",
      pull(controller) {
        let id = +(req.headers.get("last-event-id") ?? 0);
        const handler = async (ev, dat) => {
          await controller.write(`id:${id}\nevent:${ev}\ndata:${dat !== undefined ? JSON.stringify(dat) : ""}\n\n`);
          await controller.flush();
          id++;
        };
        debug(`subscribing to ${channel}`);
        emitter.on(channel, handler);
        req.signal.onabort = () => {
          debug(`unsubscribing from ${channel}`);
          emitter.off(channel, handler);
        };
        handler(event ?? "connect", data);
        return new Promise(() => void 0);
      },
    }),
    {
      status: 200,
      headers: { "content-type": "text/event-stream;charset=utf-8" },
    },
  );
};

const emit = (channel, event, data) => {
  if (emitter.listeners(channel).length === 0) {
    debug(`no listeners for ${channel}`);
    return;
  }
  trace(`emitting to ${channel} (${emitter.listenerCount(channel)} listeners)`);
  emitter.emit(channel, event, data);
};

const server = Bun.serve({
  port: 0,
  async fetch(req) {
    return subscribe(req, "test", "connect", { hello: "world" });
  },
});

console.log(`Server running on ${server.url}`);

setInterval(() => {
  emit("test", "ping", { time: Date.now() });
}, 30000);

async function addClient() {
  const socket = net.connect(server.port, "localhost");
  await once(socket, "connect");
  socket.write("GET / HTTP/1.1\r\n");
  socket.write("Host: localhost\r\n");
  socket.write("Connection: keep-alive\r\n");
  socket.write("Accept: text/event-stream\r\n");
  socket.write("\r\n");
}

for (let i = 0; i < 3000; i++) {
  await addClient();
}

Left: v1.1.27 Right v1.1.24
Screenshot 2024-09-09 at 09 48 58

In this case I leave the connection idle for 30s left timeouts and unsubscribes everyone, If I keep it active it will not timeout (1 event each second for example), if idleTimeout is set by a larger number it respect this idleTimeout in the v1.1.27.
If I use v1.1.24 with tls it always timeouts in 10s.

The warning:

warn: Possible EventEmitter memory leak detected. 2049 metrics listeners added to [EventEmitter2]. Use emitter.setMaxListeners() to increase limit
      at overflowWarning (node:events:32:25)
      at addListener (node:events:268:57)
      at pull (/Users/simylein/Private/simylein/lib/event/event.ts:20:13)

Shows that you have too many active listeners in this case active connections sending events. Do you have any crash report or log that would show a error occurring causing the application to stop? Or the server is down due to too many connections?

@simylein
Copy link
Contributor Author

simylein commented Sep 9, 2024

As far as I am able to understand the pull method inside the readable stream runs recursively in bun v1.1.27 and not in bun v1.1.26 with my setup. I will try to get a more minimal reproduction but I suspect it has to do with the non resolving promise.

@cirospaciari
Copy link
Collaborator

cirospaciari commented Sep 9, 2024

@simylein in the version v1.1.27 you can also set a per-request timeout using server.timeout(request, seconds); instead of rising the entire application idleTimeout

You can also set a total request timeout and resolve the promise when you got aborted, never leaving non resolving promises:

const subscribe = (req, channel, event, data) => {
  return new Response(
    new ReadableStream({
      type: "direct",
      pull(controller) {
        let id = +(req.headers.get("last-event-id") ?? 0);
        const handler = async (ev, dat) => {
          await controller.write(`id:${id}\nevent:${ev}\ndata:${dat !== undefined ? JSON.stringify(dat) : ""}\n\n`);
          await controller.flush();
          id++;
        };
        debug(`subscribing to ${channel}`);
        emitter.on(channel, handler);
        const { promise, resolve } = Promise.withResolvers();
        // total requestTimeout 5min
        const timer = setTimeout(resolve, 5 * 60 * 1000);
        req.signal.onabort = () => {
          debug(`unsubscribing from ${channel}`);
          emitter.off(channel, handler);
          resolve(); // resolve the promise when the client disconnects
          clearTimeout(timer); // clear the timeout when the client disconnects
        };
        handler(event ?? "connect", data);

        return promise;
      },
    }),
    {
      status: 200,
      headers: { "content-type": "text/event-stream;charset=utf-8" },
    },
  );
};

@cirospaciari
Copy link
Collaborator

cirospaciari commented Sep 9, 2024

Also noticed that in v1.1.24 if we sent a larger payload at first or have some backpressure we got the 10s timeout like on TLS or the default behavior in v1.1.27:

Left: v1.1.27 Right v1.1.24
Screenshot 2024-09-09 at 10 24 05
If we have backpressure only in some connections we got a weird behavior on v1.1.24:
Screenshot 2024-09-09 at 10 37 50

import EventEmitter, { once } from "node:events";
import net from "node:net";
const debug = console.debug;
const trace = console.trace;

const emitter = new EventEmitter();
emitter.setMaxListeners(0);
const payload = JSON.stringify(Buffer.alloc(64 * 1024 * 1024, "a").toString());
const subscribe = (req, channel, event, data) => {
  return new Response(
    new ReadableStream({
      type: "direct",
      async pull(controller) {
        let id = +(req.headers.get("last-event-id") ?? 0);
        const handler = async (ev, dat) => {
          await controller.write(`id:${id}\nevent:${ev}\ndata:${dat !== undefined ? JSON.stringify(dat) : ""}\n\n`);
          await controller.flush();
          id++;
        };
        debug(`subscribing to ${channel}`);
        emitter.on(channel, handler);
        const { promise, resolve } = Promise.withResolvers();
        // total requestTimeout 5min
        const timer = setTimeout(resolve, 5 * 60 * 1000);
        req.signal.onabort = () => {
          debug(`unsubscribing from ${channel}`);
          emitter.off(channel, handler);
          resolve(); // resolve the promise when the client disconnects
          clearTimeout(timer); // clear the timeout when the client disconnects
        };
        handler(event ?? "connect", payload);

        return promise;
      },
    }),
    {
      status: 200,
      headers: { "content-type": "text/event-stream;charset=utf-8" },
    },
  );
};

const emit = (channel, event, data) => {
  if (emitter.listeners(channel).length === 0) {
    debug(`no listeners for ${channel}`);
    return;
  }
  trace(`emitting to ${channel} (${emitter.listenerCount(channel)} listeners)`);
  emitter.emit(channel, event, data);
};

const server = Bun.serve({
  port: 0,
  async fetch(req) {
    return subscribe(req, "test", "connect", { hello: "world" });
  },
});

console.log(`Server running on ${server.url}`);

setInterval(() => {
  emit("test", "ping", { time: Date.now() });
}, 30000);

async function addClient() {
  const socket = net.connect(server.port, "localhost");
  await once(socket, "connect");
  socket.write("GET / HTTP/1.1\r\n");
  socket.write("Host: localhost\r\n");
  socket.write("Connection: keep-alive\r\n");
  socket.write("Accept: text/event-stream\r\n");
  socket.write("\r\n");
}

for (let i = 0; i < 100; i++) {
  await addClient();
}

@simylein
Copy link
Contributor Author

simylein commented Sep 9, 2024

@cirospaciari

you are right, this minimal sample is working as intended.

import { serve } from 'bun';
import EventEmitter from 'events';

const debug = (message: string): void => {
	console.log(message);
};

const trace = (message: string): void => {
	console.log(message);
};

export const emitter = new EventEmitter();
emitter.setMaxListeners(2048);

export const subscribe = (req: Request, channel: string, event?: string, data?: unknown): Response => {
	return new Response(
		new ReadableStream({
			type: 'direct',
			pull(controller: ReadableStreamDirectController) {
				let id = +(req.headers.get('last-event-id') ?? 0);
				const handler = async (ev: string, dat: unknown): Promise<void> => {
					await controller.write(`id:${id}\nevent:${ev}\ndata:${dat !== undefined ? JSON.stringify(dat) : ''}\n\n`);
					await controller.flush();
					id++;
				};
				debug(`subscribing to ${channel}`);
				emitter.on(channel, handler);
				req.signal.onabort = () => {
					debug(`unsubscribing from ${channel}`);
					emitter.off(channel, handler);
				};
				void handler(event ?? 'connect', data);
				return new Promise(() => void 0);
			},
		}),
		{
			status: 200,
			headers: { 'content-type': 'text/event-stream;charset=utf-8' },
		},
	);
};

export const emit = (channel: string, event: string, data?: unknown): void => {
	trace(`emitting to ${channel}`);
	emitter.emit(channel, event, data);
};

serve({
	port: 4000,
	fetch(request: Request): Response {
		return subscribe(request, 'metrics');
	},
});

setInterval(() => emit('metrics', 'log'), 1000);

There must be something else I did wrong with my setup.
But still, there must have been a change, since it worked perfectly in bun v1.1.25 and broke in 1.1.27.
(And no the timeout does not count, I know how to work around that, I am stuttered about the recursive calling behaviour)
I will try to strip as much as possible from my giant application and will try to get back to you with a minimal reproduction.

@cirospaciari
Copy link
Collaborator

cirospaciari commented Sep 9, 2024

@simylein Will wait for a repro, @Jarred-Sumner are you aware of some stream change in v1.1.27 that could have a impact here besides the timeout being more consistent now?

@cirospaciari
Copy link
Collaborator

Having a similar problem. SSE response got disconnected after roughly 10 seconds after the connection has been established. In v1.1.26 it works without an issue, but after upgrading to v1.1.27 SSE connection (response type text/event-stream) keeps get dropped after 10 seconds of inactivity. That being said, If I send a heartbeat event each second the connection does not get dropped, so it only happens in SSE connection with no event being pushed.

increasing the idleTimeout on server config or using server.timeout(request, seconds) will change/increase the timeout, you can disable only for the SSE case using server.timeout(request, 0); or raising for something like 120 if you expect to have idling connections for this long, I would recommend raising the timeout or keep a total request timeout so the server dont keep any idling socket longer than expected.

@simylein
Copy link
Contributor Author

simylein commented Sep 9, 2024

@cirospaciari

import { serve, Serve, Server } from 'bun';
import EventEmitter from 'events';

const req = (path: string, ip: string): void => {
	console.log(`req: ${path} from ${ip}`);
};

const res = (status: number, bytes: number): void => {
	console.log(`res: ${status} ${bytes} bytes`);
};

const trace = (message: string): void => {
	console.log(`trace: ${message}`);
};

const debug = (message: string): void => {
	console.log(`debug: ${message}`);
};

const info = (message: string): void => {
	console.log(`info: ${message}`);
};

const warn = (message: string): void => {
	console.log(`warn: ${message}`);
};

type FluxifyServer = Server & {
	routes: Routes;
};

type FluxifyRequest = Request & {
	ip: string;
};

type Routes = Map<string, Routes | Route>;

type Route = {
	method: string;
	endpoint: string;
	// eslint-disable-next-line @typescript-eslint/no-explicit-any
	handler: ({ param, query, body, jwt, req }: any) => Response;
};

declare global {
	// eslint-disable-next-line no-var
	var server: FluxifyServer;
}

const traverse = (router: Routes, endpoint: string): Routes | null => {
	if (endpoint.endsWith('/') && endpoint !== '/') {
		return null;
	}
	const frags = endpoint.split('/').filter((frag) => !!frag);
	const walk = (parent: Routes, ind: number): Routes | null => {
		if (ind >= frags.length) {
			if ([...parent.values()].every((value) => value instanceof Map)) {
				return null;
			}
			return parent;
		}
		const child = parent.get(frags[ind]) as Routes | undefined;
		if (!child) {
			for (const [key, value] of parent.entries()) {
				if (key.startsWith(':')) {
					const result = walk(value as Routes, ind + 1);
					if (result) {
						return result;
					}
				}
			}
			return null;
		}
		return walk(child, ind + 1);
	};
	return walk(router, 0);
};

const pick = (matching: Routes | null, method: string): Route | undefined => {
	const route = matching?.get(method === 'head' ? 'get' : method) as Route | undefined;
	if (!route) {
		return matching?.get('all') as Route | undefined;
	}
	return route;
};

const bootstrap = (): FluxifyServer => {
	const options: Serve = {
		port: 4000,
		async fetch(request: FluxifyRequest, server: Server): Promise<Response> {
			request.ip = server.requestIP(request)?.address ?? '';

			const url = new URL(request.url);
			req(url.pathname, request.ip);

			const matchingRoutes = traverse(global.server.routes, url.pathname);
			const targetRoute = pick(matchingRoutes, request.method.toLowerCase());

			if (targetRoute) {
				const data = await targetRoute.handler({ param: null, query: null, body: null, jwt: null, req: request });
				if (data instanceof Response) {
					res(data.status, (await data.clone().blob()).size);
					return data;
				}
			}
			return new Response(null, { status: 404 });
		},
	};

	info(`starting http server...`);
	if (!global.server) {
		global.server = serve(options) as FluxifyServer;
	} else {
		global.server.reload(options);
	}
	global.server.routes = routes;
	info(`listening on localhost:${server.port}`);
	return global.server;
};

const routes: Routes = new Map();

const register = (route: Route): void => {
	const frags = route.endpoint.split('/').filter((frag) => !!frag);
	const walk = (parent: Routes, ind: number): void => {
		if (ind >= frags.length) {
			if (parent.has(route.method)) {
				return warn(`ambiguous route ${route.method} ${route.endpoint}`);
			}
			parent.set(route.method, route);
			return debug(`mapped route ${route.method} ${route.endpoint}`);
		}
		if (!parent.has(frags[ind])) {
			parent.set(frags[ind], new Map());
		}
		const child = parent.get(frags[ind])! as Routes;
		walk(child, ind + 1);
	};
	walk(routes, 0);
};

const emitter = new EventEmitter();
emitter.setMaxListeners(2048);

const subscribe = (request: Request, channel: string, event?: string, data?: unknown): Response => {
	return new Response(
		new ReadableStream({
			type: 'direct',
			pull(controller: ReadableStreamDirectController) {
				let id = +(request.headers.get('last-event-id') ?? 0);
				const handler = async (ev: string, dat: unknown): Promise<void> => {
					await controller.write(`id:${id}\nevent:${ev}\ndata:${dat !== undefined ? JSON.stringify(dat) : ''}\n\n`);
					await controller.flush();
					id++;
				};
				debug(`subscribing to ${channel}`);
				emitter.on(channel, handler);
				request.signal.onabort = () => {
					debug(`unsubscribing from ${channel}`);
					emitter.off(channel, handler);
				};
				void handler(event ?? 'connect', data);
				return new Promise(() => void 0);
			},
		}),
		{
			status: 200,
			headers: { 'content-type': 'text/event-stream;charset=utf-8' },
		},
	);
};

const emit = (channel: string, event: string, data?: unknown): void => {
	trace(`emitting to ${channel}`);
	emitter.emit(channel, event, data);
};

register({
	method: 'get',
	endpoint: '/sse',
	handler: ({ req: request }) => {
		info('streaming metrics');
		return subscribe(request, 'metrics');
	},
});

bootstrap();

setInterval(() => emit('metrics', 'log'), 2000);

bun v1.1.25

debug: mapped route get /sse
info: starting http server...
info: listening on localhost:4000
req: /sse from ::1
info: streaming metrics
res: 200 0 bytes
debug: subscribing to metrics
trace: emitting to metrics
debug: unsubscribing from metrics
trace: emitting to metrics

bun v1.1.27

--- too many log lines above ---
debug: subscribing to metrics
debug: subscribing to metrics
debug: subscribing to metrics
debug: subscribing to metrics
debug: subscribing to metrics
debug: subscribing to metrics
debug: subscribing to metrics
debug: subscribing to metrics
debug: subscribing to metrics
--- too many log lines below ---

@cirospaciari
Copy link
Collaborator

cirospaciari commented Sep 9, 2024

@simylein looks like if you comment the line res(data.status, (await data.clone().blob()).size); it works normally again.

Is related to data.clone() being called, without calling clone we never got pull being called a infinite amount of times.

@cirospaciari
Copy link
Collaborator

cirospaciari commented Sep 9, 2024

Pull will be called as many times it takes until controller.close() is called but returning a promise should wait until the promise is resolved before calling again.

@Jarred-Sumner
Copy link
Collaborator

Jarred-Sumner commented Sep 10, 2024

Pull will be called as many times it takes until controller.close() is called but returning a promise should wait until the promise is resolved before calling again.

Note: this is not correct - "direct" streams are supposed to only ever call pull once and that was the bug here.

@devcaeg
Copy link

devcaeg commented Sep 11, 2024

I can confirm that in 1.1.27 version of bun, in a ReadableStream, if some kind of event is not sent every less than 10 seconds, the connection is closed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants