Skip to content

Commit

Permalink
ZOOKEEPER-831. BookKeeper: Throttling improved for reads
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/hadoop/zookeeper/trunk@998200 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
breed committed Sep 17, 2010
1 parent 550583b commit d1ee788
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 55 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ BUGFIXES:

ZOOKEEPER-870. Zookeeper trunk build broken. (mahadev via phunt)

ZOOKEEPER-831. BookKeeper: Throttling improved for reads (breed via fpj)

IMPROVEMENTS:
ZOOKEEPER-724. Improve junit test integration - log harness information
(phunt via mahadev)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class LedgerHandle implements ReadCallback, AddCallback, CloseCallback {
private Integer throttling = 5000;

final Queue<PendingAddOp> pendingAddOps = new ArrayDeque<PendingAddOp>();

LedgerHandle(BookKeeper bk, long ledgerId, LedgerMetadata metadata,
DigestType digestType, byte[] password)
throws GeneralSecurityException, NumberFormatException {
Expand Down Expand Up @@ -148,6 +148,15 @@ public DigestManager getDigestManager() {
return macManager;
}

/**
* Return total number of available slots.
*
* @return int available slots
*/
Semaphore getAvailablePermits(){
return this.opCounterSem;
}

/**
* Get the Distribution Schedule
*
Expand Down Expand Up @@ -277,7 +286,6 @@ public void asyncReadEntries(long firstEntry, long lastEntry,
}

new PendingReadOp(this, firstEntry, lastEntry, cb, ctx).initiate();
opCounterSem.acquire();
}

/**
Expand Down Expand Up @@ -310,26 +318,32 @@ public long addEntry(byte[] data) throws InterruptedException, BKException {
*/
public void asyncAddEntry(final byte[] data, final AddCallback cb,
final Object ctx) throws InterruptedException {
bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
@Override
public void safeRun() {
if (metadata.isClosed()) {
LOG.warn("Attempt to add to closed ledger: " + ledgerId);
cb.addComplete(BKException.Code.LedgerClosedException,
LedgerHandle.this, -1, ctx);
return;
}

long entryId = ++lastAddPushed;
PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb, ctx, entryId);
pendingAddOps.add(op);
ChannelBuffer toSend = macManager.computeDigestAndPackageForSending(
entryId, lastAddConfirmed, data);
op.initiate(toSend);

}
});
opCounterSem.acquire();

try{
bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
@Override
public void safeRun() {
if (metadata.isClosed()) {
LOG.warn("Attempt to add to closed ledger: " + ledgerId);
LedgerHandle.this.opCounterSem.release();
cb.addComplete(BKException.Code.LedgerClosedException,
LedgerHandle.this, -1, ctx);
return;
}

long entryId = ++lastAddPushed;
PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb, ctx, entryId);
pendingAddOps.add(op);
ChannelBuffer toSend = macManager.computeDigestAndPackageForSending(
entryId, lastAddConfirmed, data);
op.initiate(toSend);
}
});
} catch (RuntimeException e) {
opCounterSem.release();
throw e;
}
}

// close the ledger and send fails to all the adds in the pipeline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,18 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
numPendingReads = endEntryId - startEntryId + 1;
}

public void initiate() {
public void initiate() throws InterruptedException {
long nextEnsembleChange = startEntryId, i = startEntryId;

ArrayList<InetSocketAddress> ensemble = null;
do {

if(LOG.isDebugEnabled()){
LOG.debug("Acquiring lock: " + i);
}

lh.opCounterSem.acquire();

if (i == nextEnsembleChange) {
ensemble = lh.metadata.getEnsemble(i);
nextEnsembleChange = lh.metadata.getNextEnsembleChange(i);
Expand All @@ -80,7 +86,6 @@ public void initiate() {
sendRead(ensemble, entry, BKException.Code.ReadException);

} while (i <= endEntryId);

}

void sendRead(ArrayList<InetSocketAddress> ensemble, LedgerEntry entry, int lastErrorCode) {
Expand Down Expand Up @@ -114,7 +119,6 @@ public void readEntryComplete(int rc, long ledgerId, final long entryId, final C
return;
}

numPendingReads--;
ChannelBufferInputStream is;
try {
is = lh.macManager.verifyDigestAndReturnData(entryId, buffer);
Expand All @@ -125,15 +129,23 @@ public void readEntryComplete(int rc, long ledgerId, final long entryId, final C

entry.entryDataStream = is;

numPendingReads--;
if (numPendingReads == 0) {
submitCallback(BKException.Code.OK);
}


if(LOG.isDebugEnabled()){
LOG.debug("Releasing lock: " + entryId);
}

lh.opCounterSem.release();

if(numPendingReads < 0)
LOG.error("Read too many values");
}

private void submitCallback(int code){
cb.readComplete(code, lh, PendingReadOp.this, PendingReadOp.this.ctx);
lh.opCounterSem.release();
}
public boolean hasMoreElements() {
return !seq.isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,17 @@

import java.io.File;
import java.io.IOException;
import java.lang.NoSuchFieldException;
import java.lang.IllegalAccessException;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Semaphore;


import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.BKException;
Expand Down Expand Up @@ -83,7 +88,7 @@ public BookieReadWriteTest(DigestType digestType){
Set<Object> syncObjs;

class SyncObj {
int counter;
volatile int counter;
boolean value;

public SyncObj() {
Expand Down Expand Up @@ -237,37 +242,90 @@ public void testReadWriteAsyncSingleClient() throws IOException {
}
}

class ThrottleTestCallback implements ReadCallback {
int throttle;

ThrottleTestCallback(int threshold){
this.throttle = threshold;
}

public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx){
if(rc != BKException.Code.OK){
fail("Return code is not OK: " + rc);
}

ls = seq;
synchronized(sync){
sync.counter += throttle;
sync.notify();
}
LOG.info("Current counter: " + sync.counter);
}
}

/**
* Method for obtaining the available permits of a ledger handle
* using reflection to avoid adding a new public method to the
* class.
*
* @param lh
* @return
*/
@SuppressWarnings("unchecked")
int getAvailablePermits(LedgerHandle lh) throws
NoSuchFieldException, IllegalAccessException
{
Field field = LedgerHandle.class.getDeclaredField("opCounterSem");
field.setAccessible(true);
return ((Semaphore)field.get(lh)).availablePermits();
}

@Test
public void testReadWriteAsyncSingleClientThrottle() throws IOException {
public void testReadWriteAsyncSingleClientThrottle() throws
IOException, NoSuchFieldException, IllegalAccessException {
try {

Integer throttle = 100;
ThrottleTestCallback tcb = new ThrottleTestCallback(throttle);
// Create a BookKeeper client and a ledger
System.setProperty("throttle", "1000");
System.setProperty("throttle", throttle.toString());
bkc = new BookKeeper("127.0.0.1");
lh = bkc.createLedger(digestType, ledgerPassword);
// bkc.initMessageDigest("SHA1");
ledgerId = lh.getId();
LOG.info("Ledger ID: " + lh.getId());

numEntriesToWrite = 20000;
for (int i = 0; i < (numEntriesToWrite - 10000); i++) {
numEntriesToWrite = 8000;
for (int i = 0; i < (numEntriesToWrite - 2000); i++) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);

entries.add(entry.array());
entriesSize.add(entry.array().length);
lh.asyncAddEntry(entry.array(), this, sync);
/*
* Check that the difference is no larger than the throttling threshold
*/
int testValue = getAvailablePermits(lh);
assertTrue("Difference is incorrect : " + i + ", " + sync.counter + ", " + testValue, testValue <= throttle);
}


for (int i = 0; i < 10000; i++) {
for (int i = 0; i < 2000; i++) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);

entries.add(entry.array());
entriesSize.add(entry.array().length);
lh.asyncAddEntry(entry.array(), this, sync);

/*
* Check that the difference is no larger than the throttling threshold
*/
int testValue = getAvailablePermits(lh);
assertTrue("Difference is incorrect : " + i + ", " + sync.counter + ", " + testValue, testValue <= throttle);
}

// wait for all entries to be acknowledged
Expand All @@ -290,35 +348,22 @@ public void testReadWriteAsyncSingleClientThrottle() throws IOException {
assertTrue("Verifying number of entries written", lh.getLastAddConfirmed() == (numEntriesToWrite - 1));

// read entries
lh.asyncReadEntries(0, numEntriesToWrite - 1, this, (Object) sync);

sync.counter = 0;
for (int i = 0; i < numEntriesToWrite; i+=throttle) {
lh.asyncReadEntries(i, i + throttle - 1, tcb, (Object) sync);
int testValue = getAvailablePermits(lh);
assertTrue("Difference is incorrect : " + i + ", " + sync.counter + ", " + testValue, testValue <= throttle);
}

synchronized (sync) {
while (sync.value == false) {
while (sync.counter < numEntriesToWrite) {
LOG.info("Entries counter = " + sync.counter);
sync.wait();
}
}

LOG.debug("*** READ COMPLETE ***");

// at this point, LedgerSequence ls is filled with the returned
// values
int i = 0;
while (ls.hasMoreElements()) {
ByteBuffer origbb = ByteBuffer.wrap(entries.get(i));
Integer origEntry = origbb.getInt();
byte[] entry = ls.nextElement().getEntry();
ByteBuffer result = ByteBuffer.wrap(entry);
LOG.debug("Length of result: " + result.capacity());
LOG.debug("Original entry: " + origEntry);

Integer retrEntry = result.getInt();
LOG.debug("Retrieved entry: " + retrEntry);
assertTrue("Checking entry " + i + " for equality", origEntry.equals(retrEntry));
assertTrue("Checking entry " + i + " for size", entry.length == entriesSize.get(i).intValue());
i++;
}
assertTrue("Checking number of read entries", i == numEntriesToWrite);

lh.close();
} catch (KeeperException e) {
LOG.error("Test failed", e);
Expand Down Expand Up @@ -565,20 +610,25 @@ public void testMultiLedger() throws IOException {
}

public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
if(rc != BKException.Code.OK) fail("Return code is not OK: " + rc);

SyncObj x = (SyncObj) ctx;

synchronized (x) {
x.counter++;
x.notify();
}
}

public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
if(rc != BKException.Code.OK) fail("Return code is not OK: " + rc);

ls = seq;

synchronized (sync) {
sync.value = true;
sync.notify();
}

}

@Before
Expand Down

0 comments on commit d1ee788

Please sign in to comment.