Skip to content

Commit d661bf8

Browse files
Add support for T-Digest (#25)
1 parent 849e48c commit d661bf8

File tree

2 files changed

+183
-1
lines changed

2 files changed

+183
-1
lines changed

redisbloom/client.py

Lines changed: 124 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,25 @@ def __init__(self, args):
6565
self.depth = response['depth']
6666
self.decay = response['decay']
6767

68+
class TDigestInfo(object):
69+
compression = None
70+
capacity = None
71+
mergedNodes = None
72+
unmergedNodes = None
73+
mergedWeight = None
74+
unmergedWeight = None
75+
totalCompressions = None
76+
77+
def __init__(self, args):
78+
response = dict(zip(map(nativestr, args[::2]), args[1::2]))
79+
self.compression = response['Compression']
80+
self.capacity = response['Capacity']
81+
self.mergedNodes = response['Merged nodes']
82+
self.unmergedNodes = response['Unmerged nodes']
83+
self.mergedWeight = response['Merged weight']
84+
self.unmergedWeight = response['Unmerged weight']
85+
self.totalCompressions = response['Total compressions']
86+
6887
def spaceHolder(response):
6988
return response
7089

@@ -87,7 +106,8 @@ class Client(Redis): #changed from StrictRedis
87106
- BF for Bloom Filter
88107
- CF for Cuckoo Filter
89108
- CMS for Count-Min Sketch
90-
- TopK for TopK Data Structure
109+
- TOPK for TopK Data Structure
110+
- TDIGEST for estimate rank statistics
91111
"""
92112

93113
BF_RESERVE = 'BF.RESERVE'
@@ -126,6 +146,16 @@ class Client(Redis): #changed from StrictRedis
126146
TOPK_LIST = 'TOPK.LIST'
127147
TOPK_INFO = 'TOPK.INFO'
128148

149+
TDIGEST_CREATE = 'TDIGEST.CREATE'
150+
TDIGEST_RESET = 'TDIGEST.RESET'
151+
TDIGEST_ADD = 'TDIGEST.ADD'
152+
TDIGEST_MERGE = 'TDIGEST.MERGE'
153+
TDIGEST_CDF = 'TDIGEST.CDF'
154+
TDIGEST_QUANTILE = 'TDIGEST.QUANTILE'
155+
TDIGEST_MIN = 'TDIGEST.MIN'
156+
TDIGEST_MAX = 'TDIGEST.MAX'
157+
TDIGEST_INFO = 'TDIGEST.INFO'
158+
129159
def __init__(self, *args, **kwargs):
130160
"""
131161
Creates a new RedisBloom client.
@@ -170,6 +200,16 @@ def __init__(self, *args, **kwargs):
170200
#self.TOPK_COUNT : spaceHolder,
171201
self.TOPK_LIST : parseToList,
172202
self.TOPK_INFO : TopKInfo,
203+
204+
self.TDIGEST_CREATE : bool_ok,
205+
# self.TDIGEST_RESET : bool_ok,
206+
# self.TDIGEST_ADD : spaceHolder,
207+
# self.TDIGEST_MERGE : spaceHolder,
208+
# self.TDIGEST_CDF : spaceHolder,
209+
# self.TDIGEST_QUANTILE : spaceHolder,
210+
# self.TDIGEST_MIN : spaceHolder,
211+
# self.TDIGEST_MAX : spaceHolder,
212+
self.TDIGEST_INFO : TDigestInfo,
173213
}
174214
for k, v in six.iteritems(MODULE_CALLBACKS):
175215
self.set_response_callback(k, v)
@@ -216,6 +256,12 @@ def appendItemsAndIncrements(params, items, increments):
216256
params.append(items[i])
217257
params.append(increments[i])
218258

259+
@staticmethod
260+
def appendValuesAndWeights(params, items, weights):
261+
for i in range(len(items)):
262+
params.append(items[i])
263+
params.append(weights[i])
264+
219265
@staticmethod
220266
def appendMaxIterations(params, max_iterations):
221267
if max_iterations is not None:
@@ -550,6 +596,83 @@ def topkInfo(self, key):
550596

551597
return self.execute_command(self.TOPK_INFO, key)
552598

599+
################## T-Digest Functions ######################
600+
601+
def tdigestCreate(self, key, compression):
602+
""""
603+
Allocate the memory and initialize the t-digest.
604+
"""
605+
params = [key, compression]
606+
607+
return self.execute_command(self.TDIGEST_CREATE, *params)
608+
609+
def tdigestReset(self, key):
610+
"""
611+
Reset the sketch ``key`` to zero - empty out the sketch and re-initialize it.
612+
"""
613+
614+
return self.execute_command(self.TDIGEST_RESET, key)
615+
616+
def tdigestAdd(self, key, values, weights):
617+
"""
618+
Adds one or more samples (value with weight) to a sketch ``key``.
619+
Both ``values`` and ``weights`` are lists.
620+
Example - tdigestAdd('A', [1500.0], [1.0])
621+
"""
622+
params = [key]
623+
self.appendValuesAndWeights(params, values, weights)
624+
625+
return self.execute_command(self.TDIGEST_ADD, *params)
626+
627+
def tdigestMerge(self, toKey, fromKey):
628+
"""
629+
Merges all of the values from 'fromKey' to 'toKey' sketch.
630+
"""
631+
params = [toKey, fromKey]
632+
633+
return self.execute_command(self.TDIGEST_MERGE, *params)
634+
635+
def tdigestMin(self, key):
636+
"""
637+
Returns minimum value from the sketch ``key``.
638+
Will return DBL_MAX if the sketch is empty.
639+
"""
640+
641+
return self.execute_command(self.TDIGEST_MIN, key)
642+
643+
def tdigestMax(self, key):
644+
"""
645+
Returns maximum value from the sketch ``key``.
646+
Will return DBL_MIN if the sketch is empty.
647+
"""
648+
649+
return self.execute_command(self.TDIGEST_MAX, key)
650+
651+
def tdigestQuantile(self, key, quantile):
652+
"""
653+
Returns double value estimate of the cutoff such that a specified fraction of the data added
654+
to this TDigest would be less than or equal to the cutoff.
655+
"""
656+
params = [key, quantile]
657+
658+
return self.execute_command(self.TDIGEST_QUANTILE, *params)
659+
660+
def tdigestCdf(self, key, value):
661+
"""
662+
Returns double fraction of all points added which are <= value.
663+
"""
664+
params = [key, value]
665+
666+
return self.execute_command(self.TDIGEST_CDF, *params)
667+
668+
def tdigestInfo(self, key):
669+
"""
670+
Returns Compression, Capacity, Merged Nodes, Unmerged Nodes, Merged Weight, Unmerged Weight
671+
and Total Compressions.
672+
"""
673+
674+
return self.execute_command(self.TDIGEST_INFO, key)
675+
553676
def pipeline(self, transaction=True, shard_hint=None):
554677
"""
555678
Return a new pipeline object that can queue multiple commands for

test_commands.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ def testCreate(self):
3232
self.assertTrue(rb.cmsInitByDim('cmsDim', 100, 5))
3333
self.assertTrue(rb.cmsInitByProb('cmsProb', 0.01, 0.01))
3434
self.assertTrue(rb.topkReserve('topk', 5, 100, 5, 0.9))
35+
self.assertTrue(rb.tdigestCreate('tDigest', 100))
3536

3637
################### Test Bloom Filter ###################
3738
def testBFAdd(self):
@@ -204,6 +205,64 @@ def testTopK(self):
204205
self.assertEqual(3, info.depth)
205206
self.assertAlmostEqual(0.9, float(info.decay))
206207

208+
################### Test T-Digest ###################
209+
def testTDigestReset(self):
210+
self.assertTrue(rb.tdigestCreate('tDigest', 10))
211+
# reset on empty histogram
212+
self.assertTrue(rb.tdigestReset('tDigest'))
213+
# insert data-points into sketch
214+
self.assertTrue(rb.tdigestAdd('tDigest', list(range(10)), [1.0] * 10))
215+
216+
self.assertTrue(rb.tdigestReset('tDigest'))
217+
# assert we have 0 unmerged nodes
218+
self.assertEqual(0, rb.tdigestInfo('tDigest').unmergedNodes)
219+
220+
def testTDigestMerge(self):
221+
self.assertTrue(rb.tdigestCreate('to-tDigest', 10))
222+
self.assertTrue(rb.tdigestCreate('from-tDigest', 10))
223+
# insert data-points into sketch
224+
self.assertTrue(rb.tdigestAdd('from-tDigest', [1.0] * 10, [1.0] * 10))
225+
self.assertTrue(rb.tdigestAdd('to-tDigest', [2.0] * 10, [10.0] * 10))
226+
# merge from-tdigest into to-tdigest
227+
self.assertTrue(rb.tdigestMerge('to-tDigest', 'from-tDigest'))
228+
# we should now have 110 weight on to-histogram
229+
info = rb.tdigestInfo('to-tDigest')
230+
total_weight_to = float(info.mergedWeight) + float(info.unmergedWeight)
231+
self.assertEqual(110, total_weight_to)
232+
233+
def testTDigestMinMax(self):
234+
self.assertTrue(rb.tdigestCreate('tDigest', 100))
235+
# insert data-points into sketch
236+
self.assertTrue(rb.tdigestAdd('tDigest', [1, 2, 3], [1.0] * 3))
237+
# min/max
238+
self.assertEqual(3, float(rb.tdigestMax('tDigest')))
239+
self.assertEqual(1, float(rb.tdigestMin('tDigest')))
240+
241+
def testTDigestQuantile(self):
242+
self.assertTrue(rb.tdigestCreate('tDigest', 500))
243+
# insert data-points into sketch
244+
self.assertTrue(rb.tdigestAdd('tDigest', list([x * 0.01 for x in range(1, 10000)]), [1.0] * 10000))
245+
# assert min min/max have same result as quantile 0 and 1
246+
self.assertEqual(
247+
float(rb.tdigestMax('tDigest')),
248+
float(rb.tdigestQuantile('tDigest', 1.0)),
249+
)
250+
self.assertEqual(
251+
float(rb.tdigestMin('tDigest')),
252+
float(rb.tdigestQuantile('tDigest', 0.0)),
253+
)
254+
255+
self.assertAlmostEqual(1.0, float(rb.tdigestQuantile('tDigest', 0.01)), 2)
256+
self.assertAlmostEqual(99.0, float(rb.tdigestQuantile('tDigest', 0.99)), 2)
257+
258+
def testTDigestCdf(self):
259+
self.assertTrue(rb.tdigestCreate('tDigest', 100))
260+
# insert data-points into sketch
261+
self.assertTrue(rb.tdigestAdd('tDigest', list(range(1, 10)), [1.0] * 10))
262+
263+
self.assertAlmostEqual(0.1, float(rb.tdigestCdf('tDigest', 1.0)), 1)
264+
self.assertAlmostEqual(0.9, float(rb.tdigestCdf('tDigest', 9.0)), 1)
265+
207266
def test_pipeline(self):
208267
pipeline = rb.pipeline()
209268

0 commit comments

Comments
 (0)