@@ -28,8 +28,8 @@ function RedisConnection(host, port, options) {
28
28
this . _bound_onConnect = this . _onConnect . bind ( this )
29
29
this . _bound_onError = this . _onError . bind ( this )
30
30
this . _bound_onEnd = this . _onEnd . bind ( this )
31
-
32
- // Controls if we turn on compression or not.
31
+
32
+ // Controls if we turn on compression or not.
33
33
// All cache values which are longer than the pivot are eligible for compression
34
34
// Pivot and encoding prefix are hardcoded for now. Will revisit after
35
35
// we know we are using snappy for sure
@@ -47,42 +47,52 @@ RedisConnection.prototype.isAvailable = function () {
47
47
48
48
/** @override */
49
49
RedisConnection . prototype . set = function ( key , val , maxAgeMs , setWhenNotExist ) {
50
- var deferred = Q . defer ( )
51
- var params = [ key , this . _compress ( val ) , 'PX' , maxAgeMs ]
52
- if ( setWhenNotExist ) params . push ( 'NX' )
53
- this . _client . set ( params , this . _makeNodeResolverWithTimeout ( deferred , 'set' , 'Redis [set] key: ' + key ) )
54
- return deferred . promise
50
+ return this . _compress ( val )
51
+ . thenBound ( function ( compressedVal ) {
52
+ var params = [ key , compressedVal , 'PX' , maxAgeMs ]
53
+ if ( setWhenNotExist ) params . push ( 'NX' )
54
+
55
+ var deferred = Q . defer ( )
56
+ this . _client . set ( params , this . _makeNodeResolverWithTimeout ( deferred , 'set' , 'Redis [set] key: ' + key ) )
57
+ return deferred . promise
58
+ } , this )
55
59
}
56
60
57
61
/** @override */
58
62
RedisConnection . prototype . mset = function ( items , maxAgeMs , setWhenNotExist ) {
59
63
if ( ! items || ! items . length ) return Q . resolve ( undefined )
60
64
61
- var deferred = Q . defer ( )
62
- var commands = [ ]
63
- var i , l
64
- if ( setWhenNotExist ) {
65
- // Use "SET" to set each key with a "NX" flag.
66
- for ( i = 0 , l = items . length ; i < l ; i ++ ) {
67
- commands . push ( [ 'set' , items [ i ] . key , this . _compress ( items [ i ] . value ) , 'PX' , maxAgeMs , 'NX' ] )
68
- }
69
- } else {
70
- // Use "MSET" to set all the keys and "EXPIRE" to set TTL for each key
71
- var msetCommand = [ 'MSET' ]
72
- commands . push ( msetCommand )
73
- for ( i = 0 , l = items . length ; i < l ; i ++ ) {
74
- var key = items [ i ] . key
75
- // Append key value arguments to the set command.
76
- msetCommand . push ( key , this . _compress ( items [ i ] . value ) )
77
- // Append an expire command.
78
- commands . push ( [ 'EXPIRE' , key , Math . floor ( maxAgeMs / 1000 ) ] )
65
+ var compressedPromises = items . map ( function ( item ) {
66
+ return this . _compress ( item . value )
67
+ } , this )
68
+ return Q . all ( compressedPromises )
69
+ . thenBound ( function ( compressedValues ) {
70
+ var deferred = Q . defer ( )
71
+ var commands = [ ]
72
+
73
+ var i , l
74
+ if ( setWhenNotExist ) {
75
+ // Use "SET" to set each key with a "NX" flag.
76
+ for ( i = 0 , l = items . length ; i < l ; i ++ ) {
77
+ commands . push ( [ 'set' , items [ i ] . key , compressedValues [ i ] , 'PX' , maxAgeMs , 'NX' ] )
78
+ }
79
+ } else {
80
+ // Use "MSET" to set all the keys and "EXPIRE" to set TTL for each key
81
+ var msetCommand = [ 'MSET' ]
82
+ commands . push ( msetCommand )
83
+ for ( i = 0 , l = items . length ; i < l ; i ++ ) {
84
+ var key = items [ i ] . key
85
+ // Append key value arguments to the set command.
86
+ msetCommand . push ( key , compressedValues [ i ] )
87
+ // Append an expire command.
88
+ commands . push ( [ 'EXPIRE' , key , Math . floor ( maxAgeMs / 1000 ) ] )
89
+ }
79
90
}
80
- }
81
- this . _client . multi ( commands ) . exec (
82
- this . _makeNodeResolverWithTimeout ( deferred , 'mset' ,
83
- 'Redis [mset] key.0: ' + items [ 0 ] . key + ' key.length: ' + items . length ) )
84
-
85
- return deferred . promise
91
+ this . _client . multi ( commands ) . exec (
92
+ this . _makeNodeResolverWithTimeout ( deferred , 'mset' ,
93
+ 'Redis [mset] key.0: ' + items [ 0 ] . key + ' key.length: ' + items . length ) )
94
+ return deferred . promise
95
+ } , this )
86
96
}
87
97
88
98
/** @override */
@@ -109,7 +119,7 @@ RedisConnection.prototype.mget = function (keys) {
109
119
this . _makeNodeResolverWithTimeout ( deferred , 'mget' ,
110
120
opDesc ) )
111
121
return deferred . promise
112
- . then ( function ( vals ) {
122
+ . thenBound ( function ( vals ) {
113
123
// This function post-processes values from Redis client to
114
124
// make cache miss result consistent with the API.
115
125
//
@@ -119,12 +129,12 @@ RedisConnection.prototype.mget = function (keys) {
119
129
if ( null === vals [ i ] ) {
120
130
vals [ i ] = undefined
121
131
} else {
122
- //for real values determine if you need to decompress
123
- vals [ i ] = self . _decompress ( vals [ i ] )
132
+ //for real values determine if you need to uncompress
133
+ vals [ i ] = this . _uncompress ( vals [ i ] )
124
134
}
125
135
}
126
- return vals
127
- } )
136
+ return Q . all ( vals )
137
+ } , this )
128
138
. then ( this . getCountUpdater ( ) )
129
139
}
130
140
@@ -259,50 +269,50 @@ RedisConnection.prototype._makeNodeResolverWithTimeout = function (deferred, opN
259
269
* Private method controls how all cache values are encoded.
260
270
*
261
271
* @param {string|undefined|null } value Original cache value
262
- * @return {string|undefined|null } Value encoded appropriately for the cache
272
+ * @return {Q.Promise.< string|undefined|null> } Value encoded appropriately for the cache
263
273
*/
264
274
RedisConnection . prototype . _compress = function ( value ) {
265
275
if ( ! value || ! this . _compressionEnabled ) {
266
- return value
276
+ return Q . resolve ( value )
267
277
}
268
278
269
279
if ( value . length > this . _snappyPivot ) {
270
280
try {
271
- var compressed = snappy . compressSync ( value )
272
- return this . _compressedPrefix + compressed . toString ( 'base64' )
281
+ return Q . nfcall ( snappy . compress , value ) . thenBound ( function ( compressed ) {
282
+ return this . _compressedPrefix + compressed . toString ( 'base64' )
283
+ } , this )
273
284
} catch ( e ) {
274
285
console . warn ( "Compression failed: " + e . message )
275
- return this . _uncompressedPrefix + value
286
+ return Q . resolve ( this . _uncompressedPrefix + value )
276
287
}
277
288
} else {
278
- return this . _uncompressedPrefix + value
289
+ return Q . resolve ( this . _uncompressedPrefix + value )
279
290
}
280
291
}
281
292
282
293
/**
283
294
* Private Method that knows how to parsed encoded cache value and decode.
284
295
*
285
296
* @param {string|undefined|null } value Possibly encoded value retrieved from the cache.
286
- * @return {string|undefined|null } The original input value
297
+ * @return {Q.Promise.< string|undefined|null> } The original input value
287
298
*/
288
- RedisConnection . prototype . _decompress = function ( value ) {
289
- if ( ! value ) return value
299
+ RedisConnection . prototype . _uncompress = function ( value ) {
300
+ if ( ! value ) return Q . resolve ( value )
290
301
291
302
// Note: always check prefixes even if compression is disabled, as there might
292
303
// be entries from prior to disabling compression
293
304
if ( value . indexOf ( this . _compressedPrefix ) === 0 ) {
294
305
try {
295
306
var compressedBuf = new Buffer ( value . substring ( this . _compressedPrefix . length ) , 'base64' )
296
- var orig = snappy . decompressSync ( compressedBuf , snappy . parsers . string )
297
- return orig
307
+ return Q . nfcall ( snappy . uncompress , compressedBuf , { asBuffer : false } )
298
308
} catch ( e ) {
299
309
console . warn ( "Decompression failed: " + e . message )
300
- return undefined
301
- }
310
+ return Q . resolve ( undefined )
311
+ }
302
312
} else if ( value . indexOf ( this . _uncompressedPrefix ) === 0 ) {
303
- return value . substring ( this . _uncompressedPrefix . length )
313
+ return Q . resolve ( value . substring ( this . _uncompressedPrefix . length ) )
304
314
} else {
305
- return value
315
+ return Q . resolve ( value )
306
316
}
307
317
}
308
318
0 commit comments