14
14
import json
15
15
import jafka
16
16
import os
17
+ from collections import defaultdict
18
+ from itertools import chain
17
19
18
20
19
21
ZNODE_ACL = [{"perms" :31 ,"scheme" :"world" ,"id" :"anyone" }]
@@ -80,14 +82,32 @@ def list(self, path):
80
82
except kazoo .exceptions .NoNodeError :
81
83
return None
82
84
85
+ class Record ():
86
+ def __init__ (self , topic = None , broker_id = None , partition_id = None , consumer_offset = None , total_offset = None , backlog = None , consumer_id = None , lastmtime = None ):
87
+ self .topic = topic
88
+ self .broker_id = broker_id
89
+ self .partition_id = partition_id
90
+ self .consumer_offset = consumer_offset
91
+ self .total_offset = total_offset
92
+ self .backlog = backlog
93
+ self .consumer_id = consumer_id
94
+ self .lastmtime = lastmtime
95
+
96
+ def data (self , group ):
97
+ #('groupid', 'topic', 'part', 'consumeoffset', 'totaloffset', 'backlog', 'consumerid', 'lastmtime')
98
+ return (group , self .topic , self .broker_id + '-' + self .partition_id , self .consumer_offset
99
+ ,self .total_offset , self .total_offset - self .consumer_offset , self .consumer_id , self .lastmtime )
83
100
84
101
def main (zk ):
85
102
topics = zk .list ('/brokers/topics' )
86
103
brokerids = zk .list ('/brokers/ids' )
87
104
brokers = dict ((brokerid ,zk .get ('/brokers/ids/' + brokerid )) for brokerid in brokerids )
88
105
#brokers: brokerid => (host,port)
89
106
brokers = dict ((brokerid ,(v .split (':' )[1 ],int (v .split (':' )[2 ]))) for brokerid ,v in brokers .items ())
90
-
107
+ print ('brokers:' )
108
+ for broker_id ,(host ,port ) in brokers .items ():
109
+ print (' broker_id={} {}:{}' .format (broker_id , host , port ))
110
+ print ('=' * 120 )
91
111
#topic_broker_parts: topic=>((brokerid,parts),(brokerid,parts)...)
92
112
topic_broker_parts = {}
93
113
for topic in topics :
@@ -111,8 +131,8 @@ def main(zk):
111
131
ctopics = zk .list ('/consumers/%s/offsets' % group ) or []
112
132
113
133
#records: [(topic,broker,part,coffset,toffset,consumerid,lastmtime),...]
114
- records = []
115
- broker_records = {}
134
+ # records = []
135
+ broker_records = defaultdict ( list ) #
116
136
for ctopic in ctopics :
117
137
cparts = zk .list ('/consumers/%s/offsets/%s' % (group ,ctopic ))
118
138
for cpart in cparts :
@@ -125,43 +145,38 @@ def main(zk):
125
145
lastmtime = coffsetstats .mtime if coffsetstats else - 1
126
146
if lastmtime :
127
147
lastmtime = datetime .datetime .fromtimestamp (int (lastmtime )/ 1000 ).strftime ('%Y-%m-%d %H:%M:%S' )
128
- record = [ctopic ,cbroker ,cpartition ,coffset ,- 1 ,consumerid ,lastmtime ]
148
+ #record = [ctopic,cbroker,cpartition,coffset,-1,consumerid,lastmtime]
149
+ record = Record (topic = ctopic , broker_id = cbroker , partition_id = cpartition , consumer_offset = int (coffset ),
150
+ total_offset = 0 , backlog = - 1 , consumer_id = consumerid , lastmtime = lastmtime )
129
151
######################
130
- rds = broker_records .get (cbroker ,[])
131
- if not rds : broker_records [cbroker ] = rds
132
- rds .append (record )
133
- records .append (record )
134
-
135
- for broker ,rds in broker_records .items ():
136
- (host ,port ) = brokers [str (broker )]
137
- consumer = jafka .Consumer (host ,port )
152
+ broker_records [cbroker ].append (record )
153
+
154
+ for broker_id , record_list in broker_records .items ():
155
+ (host ,port ) = brokers [str (broker_id )]
156
+ consumer = jafka .Consumer (host , port )
138
157
try :
139
- for record in rds :
140
- toffset = consumer .getoffsetsbefore (record [0 ],int (record [2 ]),- 1 ,1 )[0 ]
141
- record [4 ] = toffset
158
+ for record in record_list :
159
+ record .total_offset = consumer .getoffsetsbefore (record .topic ,int (record .partition_id ), - 1 , 1 )[0 ]
142
160
finally :
143
161
consumer .close ()
144
162
145
- title = ('groupid' ,'topic' ,'part' ,'consumeoffset' ,'totaloffset' ,'backlog' ,'consumerid' ,'lastmtime' )
163
+ title = ('groupid' , 'topic' , 'part' , 'consumeoffset' , 'totaloffset' , 'backlog' , 'consumerid' , 'lastmtime' )
164
+
165
+ all_records = list ( chain (* broker_records .values ()) )
166
+ all_record_data = list ( x .data (group ) for x in all_records )
167
+
146
168
wid_sep = list (len (x ) for x in title )
147
- records = sorted (records ,key = lambda r :r [0 ]+ r [1 ]+ r [2 ])
148
- print_records = []
149
- for record in records :
150
- (ctopic ,cbroker ,cpartition ,coffset ,toffset ,consumerid ,lastmtime ) = record
151
- left = int (toffset ) - int (coffset )
152
- pr = (group ,ctopic ,cbroker + '-' + cpartition ,coffset ,toffset ,left ,consumerid ,lastmtime )
153
- print_records .append (pr )
154
- wid_sep_num = list (len (str (x )) for x in pr )
155
- for i in range (len (wid_sep )):
156
- if wid_sep [i ] < wid_sep_num [i ]:
157
- wid_sep [i ] = wid_sep_num [i ]
158
- format_sep = ' ' .join (list ('{:>' + str (x )+ '}' for x in wid_sep ))
159
- ptitle = format_sep .format (* title )
160
- print (ptitle )
161
- print ('-' * len (ptitle ))
162
- for record in print_records :
169
+ for data in all_record_data :
170
+ for i in range (len (data )):
171
+ wid_sep [i ] = max (wid_sep [i ], len (str (data [i ])))
172
+
173
+ format_sep = ' ' .join (list ('{:>' + str (x )+ '}' for x in wid_sep ))
174
+ ptr_title = format_sep .format (* title )
175
+ print (ptr_title )
176
+ print ('-' * len (ptr_title ))
177
+ for data in all_record_data :
163
178
#print(format_sep,' -> ',record)
164
- print (format_sep .format (* record ))
179
+ print (format_sep .format (* data ))
165
180
print ('\n ' )
166
181
167
182
0 commit comments