Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Suggestions for improving code performance #478

Open
Crypto7816 opened this issue Sep 17, 2024 · 5 comments
Open

Suggestions for improving code performance #478

Crypto7816 opened this issue Sep 17, 2024 · 5 comments

Comments

@Crypto7816
Copy link

This is a code for calculating the rolling average of the future ratio/spot ratio - 1 in real-time. Since there can be a large amount of data streaming in from the websocket every second, about 100-200 data points, I’d like to know if you have any suggestions to improve performance?
image

import asyncio
import numpy as np
from streamz import Stream
from tradebot.exchange import BinanceWebsocketManager
from tradebot.entity import log_register
from tradebot.constants import MARKET_URLS

# ratio description

# 1. calulate the ratio of future price and spot price
# 2. add ratio to a rolling window of size 20
# 3. calculate the mean of the rolling window

log = log_register.get_logger("BTCUSDT", level="INFO", flush=False)

spot_stream = Stream()
future_stream = Stream()

window_size = 20

def cb_future(msg):
    if "e" in msg:
        future_stream.emit(msg)
    
def cb_spot(msg):
    if "e" in msg:
        spot_stream.emit(msg)

async def main():
    try:
        ws_spot_client = BinanceWebsocketManager(base_url = "wss://stream.binance.com:9443/ws")
        ws_um_client = BinanceWebsocketManager(base_url = "wss://fstream.binance.com/ws")
        await ws_um_client.subscribe_trade("BTCUSDT", callback=cb_future)
        await ws_spot_client.subscribe_trade("BTCUSDT", callback=cb_spot)
        
        ratio = spot_stream.combine_latest(future_stream).map(lambda x: float(x[1]['p']) / float(x[0]['p']) - 1)
        ratio.sliding_window(window_size).map(lambda window: np.mean(window)).sink(lambda x: print(f"Ratio Mean: {x:.8f}")) 
        # await ws_client.subscribe_book_ticker("ETHUSDT", callback=cb)
        # await ws_client.subscribe_agg_trades(["BTCUSDT", "ETHUSDT"], callback=cb)
        while True:
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        await ws_spot_client.close()
        await ws_um_client.close()
        print("Websocket closed")

if __name__ == "__main__":
    asyncio.run(main())

Here is the implementation of BinanceWebsocket

class WebsocketManager(ABC):
    def __init__(
        self,
        base_url: str,
        ping_interval: int = 5,
        ping_timeout: int = 5,
        close_timeout: int = 1,
        max_queue: int = 12,
    ):
        self._base_url = base_url
        self._ping_interval = ping_interval
        self._ping_timeout = ping_timeout
        self._close_timeout = close_timeout
        self._max_queue = max_queue
        
        self._tasks: List[asyncio.Task] = []
        self._subscripions = defaultdict(asyncio.Queue)
        self._log = log_register.get_logger(name=type(self).__name__, level="INFO", flush=True)
        
    async def _consume(self, subscription_id: str, callback: Callable[..., Any] = None, *args, **kwargs):
        while True:
            msg = await self._subscripions[subscription_id].get()
            if asyncio.iscoroutinefunction(callback):
                await callback(msg, *args, **kwargs)
            else:
                callback(msg, *args, **kwargs)
            self._subscripions[subscription_id].task_done()

    @abstractmethod
    async def _subscribe(self, symbol: str, typ: str, channel: str, queue_id: str):
        pass


    async def close(self):
        for task in self._tasks:
            task.cancel()
        await asyncio.gather(*self._tasks, return_exceptions=True)
        self._log.info("All WebSocket connections closed.")
        
class BinanceWebsocketManager(WebsocketManager):
    def __init__(self, base_url: str):
        super().__init__(
            base_url=base_url,
            ping_interval=5,
            ping_timeout=5,
            close_timeout=1,
            max_queue=12,
        )
    
    async def _subscribe(self, payload: Dict[str, Any], subscription_id: str):
        async for websocket in websockets.connect(
            uri = self._base_url,
            ping_interval=self._ping_interval,
            ping_timeout=self._ping_timeout,
            close_timeout=self._close_timeout,
            max_queue=self._max_queue,
        ):
            try:
                payload = json.dumps(payload)
                await websocket.send(payload)
                async for msg in websocket:
                    msg = orjson.loads(msg)
                    await self._subscripions[subscription_id].put(msg)
            except websockets.ConnectionClosed:
                self._log.error(f"Connection closed, reconnecting...")
    
    async def subscribe_book_ticker(self, symbol: str, callback: Callable[..., Any] = None, *args, **kwargs):
        subscription_id = f"book_ticker.{symbol}"
        id = int(time.time() * 1000)
        payload = {
            "method": "SUBSCRIBE",
            "params": [f"{symbol.lower()}@bookTicker"],
            "id": id
        }
        if subscription_id not in self._subscripions:
            self._tasks.append(asyncio.create_task(self._consume(subscription_id, callback=callback, *args, **kwargs)))
            self._tasks.append(asyncio.create_task(self._subscribe(payload, subscription_id)))
        else:
            self._log.info(f"Already subscribed to {subscription_id}")
    
    async def subscribe_book_tickers(self, symbols: List[str], callback: Callable[..., Any] = None, *args, **kwargs):
        for symbol in symbols:
            await self.subscribe_book_ticker(symbol, callback=callback, *args, **kwargs)
        
    async def subscribe_trade(self, symbol: str, callback: Callable[..., Any] = None, *args, **kwargs):
        subscription_id = f"trade.{symbol}"
        id = int(time.time() * 1000)
        payload = {
            "method": "SUBSCRIBE",
            "params": [f"{symbol.lower()}@trade"],
            "id": id
        }
        if subscription_id not in self._subscripions: 
            self._tasks.append(asyncio.create_task(self._consume(subscription_id, callback=callback, *args, **kwargs)))
            self._tasks.append(asyncio.create_task(self._subscribe(payload, subscription_id)))
        else:
            self._log.info(f"Already subscribed to {subscription_id}")
    
    
    async def subscribe_trades(self, symbols: List[str], callback: Callable[..., Any] = None, *args, **kwargs):
        for symbol in symbols:
            await self.subscribe_trade(symbol, callback=callback, *args, **kwargs)
    
    async def subscribe_agg_trade(self, symbol: str, callback: Callable[..., Any] = None, *args, **kwargs):
        subscription_id = f"agg_trade.{symbol}"
        id = int(time.time() * 1000)
        payload = {
            "method": "SUBSCRIBE",
            "params": [f"{symbol.lower()}@aggTrade"],
            "id": id
        }
        if subscription_id not in self._subscripions:
            self._tasks.append(asyncio.create_task(self._consume(subscription_id, callback=callback, *args, **kwargs)))
            self._tasks.append(asyncio.create_task(self._subscribe(payload, subscription_id)))
        else:
            self._log.info(f"Already subscribed to {subscription_id}")  
    
    async def subscribe_agg_trades(self, symbols: List[str], callback: Callable[..., Any] = None, *args, **kwargs):
        for symbol in symbols:
            await self.subscribe_agg_trade(symbol, callback=callback, *args, **kwargs)
@martindurant
Copy link
Member

Do you find the performance is lacking? 100 events per second is not a lot for streamz. However, the library you are using and websocket latency are other things, as indeed any CPU time you might be needing for the calculation - I don't know from you code.

@Crypto7816
Copy link
Author

Crypto7816 commented Sep 18, 2024

Do you find the performance is lacking? 100 events per second is not a lot for streamz. However, the library you are using and websocket latency are other things, as indeed any CPU time you might be needing for the calculation - I don't know from you code.

Thanks for helping. I have another questions:

from streamz import Stream
import asyncio
import time

def increment(x):
    time.sleep(0.1)
    return x + 1

async def write(x):
    await asyncio.sleep(0.2)
    print(x)

async def f():
    source = Stream(asynchronous=True)
    source.map(increment).rate_limit(0.500).sink(write)
    
    for x in range(10):
        await source.emit(x) 

if __name__ == "__main__":
    asyncio.run(f())
from tornado import gen
import time
from streamz import Stream
from tornado.ioloop import IOLoop
def increment(x):
    """ A blocking increment function

    Simulates a computational function that was not designed to work
    asynchronously
    """
    time.sleep(0.1)
    return x + 1

@gen.coroutine
def write(x):
    """ A non-blocking write function

    Simulates writing to a database asynchronously
    """
    yield gen.sleep(0.2)
    print(x)


@gen.coroutine
def f():
    source = Stream(asynchronous=True)  # tell the stream we're working asynchronously
    source.map(increment).rate_limit(0.500).sink(write)

    for x in range(10):
        yield source.emit(x)

IOLoop().run_sync(f)

what is the different of this two example? I don't think there's any difference in performance between using async and sync here; await emit still blocks the subsequent processes. I tried using asyncio.create(source.emit(x)), but that just threw an error.

I think there are no difference with:

from streamz import Stream
import asyncio
import time

def increment(x):
    time.sleep(0.1)
    return x + 1

def write(x):
    time.sleep(0.2)
    print(x)

def f():
    source = Stream()
    source.map(increment).rate_limit(0.500).sink(write)
    
    for x in range(10):
        source.emit(x)

if __name__ == "__main__":
    f()

@martindurant
Copy link
Member

Correct, there will be no difference to a linear chain of event processing. The point of await, is that other async things can be happening at the same time ("concurrently"). In this case, there are no other things to process while waiting.

@Crypto7816
Copy link
Author

Correct, there will be no difference to a linear chain of event processing. The point of await, is that other async things can be happening at the same time ("concurrently"). In this case, there are no other things to process while waiting.

Can you give me some examples of concurrent or non-linear chain of event processing? I'm really struggling to think of any applications. I am trying to emit concurrently, but it causes error.

from streamz import Stream
import asyncio
import time

def increment(x):
    time.sleep(0.1)
    return x + 1

async def write(x):
    await asyncio.sleep(0.2)
    print(x)

async def f():
    source = Stream(asynchronous=True)
    source.map(increment).rate_limit(0.500).sink(write)
    
    for x in range(10):
        asyncio.create_task(source.emit(x)) # raise error

if __name__ == "__main__":
    asyncio.run(f())

@martindurant
Copy link
Member

martindurant commented Sep 18, 2024

    for x in range(10):
        asyncio.create_task(source.emit(x)) # raise error

Does indeed kick off all the coroutines, but they all have first a blocking wait, and then wait again before output.

Consider:

async def write(x):
    print(x)
    await asyncio.sleep(0.2)

async def f():
    source = Stream(asynchronous=True)
    source.sink(write)
    await asyncio.gather(*[source.emit(x) for x in range(10)])

if __name__ == "__main__":
    asyncio.run(f())

Here, all the values print immediately, and the whole takes 0.2s to run.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants