1
1
/**
2
2
* Copyright 2010 the original author or authors.
3
- *
3
+ * <p>
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
6
6
* You may obtain a copy of the License at
7
- *
7
+ * <p>
8
8
* http://www.apache.org/licenses/LICENSE-2.0
9
- *
9
+ * <p>
10
10
* Unless required by applicable law or agreed to in writing, software
11
11
* distributed under the License is distributed on an "AS IS" BASIS,
12
12
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
15
*/
16
16
package com .github .zkclient ;
17
17
18
- import java .io .File ;
19
- import java .io .IOException ;
20
-
21
- import javax .annotation .PostConstruct ;
22
- import javax .annotation .PreDestroy ;
23
-
24
- import org .apache .zookeeper .server .ZooKeeperServer ;
25
-
26
18
import com .github .zkclient .exception .ZkException ;
27
- import com .github .zkclient .exception .ZkInterruptedException ;
19
+ import org .apache .zookeeper .client .FourLetterWordMain ;
20
+ import org .apache .zookeeper .server .ServerConfig ;
21
+ import org .apache .zookeeper .server .ZooKeeperServerMain ;
28
22
import org .slf4j .Logger ;
29
23
import org .slf4j .LoggerFactory ;
30
24
31
- public class ZkServer {
25
+ import javax .annotation .PostConstruct ;
26
+ import javax .annotation .PreDestroy ;
27
+ import java .io .File ;
28
+ import java .net .ConnectException ;
32
29
33
- private final static Logger LOG = LoggerFactory .getLogger (ZkServer .class );;
30
+ public class ZkServer extends ZooKeeperServerMain {
31
+
32
+ private final static Logger LOG = LoggerFactory .getLogger (ZkServer .class );
33
+ ;
34
34
35
35
public static final int DEFAULT_PORT = 2181 ;
36
36
@@ -42,10 +42,6 @@ public class ZkServer {
42
42
43
43
private final String _logDir ;
44
44
45
- private ZooKeeperServer _zk ;
46
-
47
- private ServerCnxnFactory _nioFactory ;
48
-
49
45
private ZkClient _zkClient ;
50
46
51
47
private final int _port ;
@@ -54,6 +50,10 @@ public class ZkServer {
54
50
55
51
private final int _minSessionTimeout ;
56
52
53
+ private volatile boolean shutdown = false ;
54
+
55
+ private boolean daemon = true ;
56
+
57
57
public ZkServer (String dataDir , String logDir ) {
58
58
this (dataDir , logDir , DEFAULT_PORT );
59
59
}
@@ -80,6 +80,7 @@ public int getPort() {
80
80
81
81
@ PostConstruct
82
82
public void start () {
83
+ shutdown = false ;
83
84
startZkServer ();
84
85
_zkClient = new ZkClient ("localhost:" + _port , 10000 );
85
86
}
@@ -90,68 +91,114 @@ private void startZkServer() {
90
91
final File dataDir = new File (_dataDir );
91
92
final File dataLogDir = new File (_logDir );
92
93
dataDir .mkdirs ();
93
- dataLogDir .mkdirs ();
94
94
95
95
// single zk server
96
- LOG .info ("Start single zookeeper server..." );
97
- LOG .info ("data dir: " + dataDir .getAbsolutePath ());
98
- LOG .info ("data log dir: " + dataLogDir .getAbsolutePath ());
99
- startSingleZkServer (_tickTime , dataDir , dataLogDir , port );
96
+ LOG .info ("Start single zookeeper server, port={} data={} " , port , dataDir .getAbsolutePath ());
97
+ //
98
+ final ZooKeeperServerMain serverMain = this ;
99
+ final InnerServerConfig config = new InnerServerConfig ();
100
+ config .parse (new String []{"" + port , dataDir .getAbsolutePath (), "" + _tickTime , "60" });
101
+ config .setMinSessionTimeout (_minSessionTimeout );
102
+ //
103
+ final String threadName = "inner-zkserver-" + port ;
104
+ final Thread innerThread = new Thread (new Runnable () {
105
+ @ Override
106
+ public void run () {
107
+ try {
108
+ serverMain .runFromConfig (config );
109
+ } catch (Exception e ) {
110
+ throw new ZkException ("Unable to start single ZooKeeper server." , e );
111
+ }
112
+ }
113
+ }, threadName );
114
+ innerThread .setDaemon (daemon );
115
+ innerThread .start ();
116
+ //
117
+ waitForServerUp (port , 30000 , false );
118
+
100
119
} else {
101
120
throw new IllegalStateException ("Zookeeper port " + port + " was already in use. Running in single machine mode?" );
102
121
}
103
122
}
104
123
105
- private void startSingleZkServer (final int tickTime , final File dataDir , final File dataLogDir , final int port ) {
106
- try {
107
- _zk = new ZooKeeperServer (dataDir , dataLogDir , tickTime );
108
- _zk .setMinSessionTimeout (_minSessionTimeout );
109
- _nioFactory = ServerCnxnFactory .createFactory (port , 60 );
110
- _nioFactory .startup (_zk );
111
- } catch (IOException e ) {
112
- throw new ZkException ("Unable to start single ZooKeeper server." , e );
113
- } catch (InterruptedException e ) {
114
- throw new ZkInterruptedException (e );
115
- }
116
- }
117
-
118
124
@ PreDestroy
119
125
public void shutdown () {
120
- ZooKeeperServer zk = _zk ;
121
- if (zk == null ) {
122
- LOG .warn ("shutdown duplication" );
123
- return ;
124
- }else {
125
- _zk = null ;
126
+ if (!shutdown ) {
127
+ shutdown = true ;
128
+ LOG .info ("Shutting down ZkServer port={}..." , _port );
129
+ if (_zkClient != null ) {
130
+ try {
131
+ _zkClient .close ();
132
+ } catch (ZkException e ) {
133
+ LOG .warn ("Error on closing zkclient: " + e .getClass ().getName ());
134
+ }
135
+ _zkClient = null ;
136
+ }
137
+ super .shutdown ();
138
+ waitForServerDown (_port , 30000 , false );
139
+ LOG .info ("Shutting down ZkServer port={}...done" , _port );
126
140
}
127
- LOG .info ("Shutting down ZkServer..." );
128
- try {
129
- _zkClient .close ();
130
- } catch (ZkException e ) {
131
- LOG .warn ("Error on closing zkclient: " + e .getClass ().getName ());
141
+ }
142
+
143
+
144
+ public ZkClient getZkClient () {
145
+ return _zkClient ;
146
+ }
147
+
148
+ class InnerServerConfig extends ServerConfig {
149
+ public void setMinSessionTimeout (int minSessionTimeout ) {
150
+ this .minSessionTimeout = minSessionTimeout ;
132
151
}
133
- if (_nioFactory != null ) {
134
- _nioFactory .shutdown ();
152
+ }
153
+
154
+ public static boolean waitForServerUp (int port , long timeout , boolean secure ) {
155
+ long start = System .currentTimeMillis ();
156
+ while (true ) {
135
157
try {
136
- _nioFactory .join ();
137
- } catch (InterruptedException e ) {
138
- Thread .currentThread ().interrupt ();
158
+ // if there are multiple hostports, just take the first one
159
+ String result = FourLetterWordMain .send4LetterWord ("127.0.0.1" , port , "stat" );
160
+ if (result .startsWith ("Zookeeper version:" ) &&
161
+ !result .contains ("READ-ONLY" )) {
162
+ return true ;
163
+ }
164
+ } catch (ConnectException e ) {
165
+ // ignore as this is expected, do not log stacktrace
166
+ LOG .debug ("server {} not up: {}" , port , e .toString ());
167
+ } catch (Exception e ) {
168
+ // ignore as this is expected
169
+ LOG .info ("server {} not up" , port , e );
170
+ }
171
+
172
+ if (System .currentTimeMillis () > start + timeout ) {
173
+ break ;
139
174
}
140
- _nioFactory = null ;
141
- }
142
- zk .shutdown ();
143
- if (zk .getZKDatabase () != null ) {
144
175
try {
145
- // release file description
146
- zk .getZKDatabase ().close ();
147
- } catch (IOException e ) {
148
- LOG .error (e .getMessage (), e );
176
+ Thread .sleep (250 );
177
+ } catch (InterruptedException e ) {
178
+ // ignore
149
179
}
150
180
}
151
- LOG . info ( "Shutting down ZkServer...done" ) ;
181
+ return false ;
152
182
}
153
183
154
- public ZkClient getZkClient () {
155
- return _zkClient ;
184
+ public static boolean waitForServerDown (int port , long timeout , boolean secure ) {
185
+ long start = System .currentTimeMillis ();
186
+ while (true ) {
187
+ try {
188
+ FourLetterWordMain .send4LetterWord ("127.0.0.1" , port , "stat" );
189
+ } catch (Exception e ) {
190
+ return true ;
191
+ }
192
+
193
+ if (System .currentTimeMillis () > start + timeout ) {
194
+ break ;
195
+ }
196
+ try {
197
+ Thread .sleep (250 );
198
+ } catch (InterruptedException e ) {
199
+ // ignore
200
+ }
201
+ }
202
+ return false ;
156
203
}
157
204
}
0 commit comments