|
| 1 | +package com.github.domgew.kredis.impl |
| 2 | + |
| 3 | +import com.github.domgew.kredis.KredisClient |
| 4 | +import com.github.domgew.kredis.KredisConfiguration |
| 5 | +import com.github.domgew.kredis.KredisException |
| 6 | +import com.github.domgew.kredis.commands.KredisCommand |
| 7 | +import io.ktor.network.selector.SelectorManager |
| 8 | +import io.ktor.network.sockets.InetSocketAddress |
| 9 | +import io.ktor.network.sockets.Socket |
| 10 | +import io.ktor.network.sockets.aSocket |
| 11 | +import io.ktor.network.sockets.awaitClosed |
| 12 | +import io.ktor.network.sockets.openReadChannel |
| 13 | +import io.ktor.network.sockets.openWriteChannel |
| 14 | +import io.ktor.utils.io.ByteReadChannel |
| 15 | +import io.ktor.utils.io.ByteWriteChannel |
| 16 | +import kotlinx.coroutines.Dispatchers |
| 17 | +import kotlinx.coroutines.IO |
| 18 | +import kotlinx.coroutines.TimeoutCancellationException |
| 19 | +import kotlinx.coroutines.isActive |
| 20 | +import kotlinx.coroutines.sync.Mutex |
| 21 | +import kotlinx.coroutines.withTimeout |
| 22 | + |
| 23 | +internal abstract class AbstractKredisClient( |
| 24 | + protected val configuration: KredisConfiguration, |
| 25 | +): KredisClient { |
| 26 | + protected val lock = Mutex() |
| 27 | + private var _socket: Socket? = null |
| 28 | + private var _writeChannel: ByteWriteChannel? = null |
| 29 | + private var _readChannel: ByteReadChannel? = null |
| 30 | + |
| 31 | + final override val isConnected: Boolean |
| 32 | + get() = _socket != null |
| 33 | + && _writeChannel != null |
| 34 | + && _readChannel != null |
| 35 | + && _socket?.isActive == true |
| 36 | + |
| 37 | + private suspend fun doConnect() { |
| 38 | + val socket = try { |
| 39 | + withTimeout(configuration.connectionTimeoutMillis) { |
| 40 | + aSocket(SelectorManager(Dispatchers.IO)) |
| 41 | + .tcp() |
| 42 | + .connect( |
| 43 | + remoteAddress = InetSocketAddress( |
| 44 | + hostname = configuration.host, |
| 45 | + port = configuration.port, |
| 46 | + ), |
| 47 | + configure = { |
| 48 | + keepAlive = configuration.keepAlive |
| 49 | + }, |
| 50 | + ) |
| 51 | + } |
| 52 | + } catch (ex: TimeoutCancellationException) { |
| 53 | + throw KredisException.ConnectionTimeout |
| 54 | + } |
| 55 | + this._socket = socket |
| 56 | + this._readChannel = socket.openReadChannel() |
| 57 | + this._writeChannel = socket.openWriteChannel(autoFlush = false) |
| 58 | + } |
| 59 | + |
| 60 | + protected suspend fun ensureConnected() { |
| 61 | + if (!isConnected) { |
| 62 | + doConnect() |
| 63 | + } |
| 64 | + } |
| 65 | + |
| 66 | + protected suspend fun doClose() { |
| 67 | + _socket?.close() |
| 68 | + _socket?.awaitClosed() |
| 69 | + } |
| 70 | + |
| 71 | + protected suspend fun executeCommand( |
| 72 | + command: KredisCommand, |
| 73 | + ): RedisMessage { |
| 74 | + ensureConnected() |
| 75 | + command.toRedisMessage().writeTo(_writeChannel!!) |
| 76 | + _writeChannel!!.flush() |
| 77 | + return RedisMessage.parse(_readChannel!!) |
| 78 | + } |
| 79 | +} |
0 commit comments