Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] refactor cursor read entry process to fix dead loop read issue of txn #22944

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -733,4 +733,6 @@ default CompletableFuture<Position> getLastDispatchablePosition(final Predicate<
}

Position getFirstPosition();

void updateMaxReadPosition(Position position);
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ public class ManagedCursorImpl implements ManagedCursor {
@SuppressWarnings("unused")
private volatile OpReadEntry waitingReadOp = null;

protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, OpReadEntry>
WAITING_READ_OP_UPDATER_BY_MAX_READ_POSITION = AtomicReferenceFieldUpdater
.newUpdater(ManagedCursorImpl.class, OpReadEntry.class, "waitingReadOpByMaxReadPosition");
@SuppressWarnings("unused")
private volatile OpReadEntry waitingReadOpByMaxReadPosition = null;

public static final int FALSE = 0;
public static final int TRUE = 1;
private static final AtomicIntegerFieldUpdater<ManagedCursorImpl> RESET_CURSOR_IN_PROGRESS_UPDATER =
Expand Down Expand Up @@ -1017,8 +1023,11 @@ public void asyncReadEntriesWithSkipOrWait(int maxEntries, long maxSizeBytes, Re

int numberOfEntriesToRead = applyMaxSizeCap(maxEntries, maxSizeBytes);

if (hasMoreEntries() && maxPosition.compareTo(readPosition) >= 0) {
boolean hasMoreEntries = hasMoreEntries();
boolean hasMoreEntriesByMaxReadPosition = hasMoreEntriesByMaxReadPosition();
if (hasMoreEntries && hasMoreEntriesByMaxReadPosition) {
// If we have available entries, we can read them immediately
// readPos <= lastConfirmedEntry and readPos <= maxReadPosition
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Read entries immediately", ledger.getName(), name);
}
Expand All @@ -1030,25 +1039,54 @@ public void asyncReadEntriesWithSkipOrWait(int maxEntries, long maxSizeBytes, Re
OpReadEntry op = OpReadEntry.create(this, readPosition, numberOfEntriesToRead, callback,
ctx, maxPosition, skipCondition);

if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) {
op.recycle();
callback.readEntriesFailed(new ManagedLedgerException.ConcurrentWaitCallbackException(), ctx);
return;
}
if (hasMoreEntries) {
// We have available entries, but we can't read them immediately
// readPos <= lastConfirmedEntry and readPos > maxReadPosition
if (!WAITING_READ_OP_UPDATER_BY_MAX_READ_POSITION.compareAndSet(this, null, op)) {
op.recycle();
callback.readEntriesFailed(new ManagedLedgerException.ConcurrentWaitCallbackException(), ctx);
return;
}

if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Deferring retry of read at position {}", ledger.getName(), name, op.readPosition);
}
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Deferring retry of read at readPosition {} maxPosition {}",
ledger.getName(), name, op.readPosition, op.maxPosition);
}

// Check again for new entries after the configured time, then if still no entries are available register
// to be notified
if (getConfig().getNewEntriesCheckDelayInMillis() > 0) {
ledger.getScheduledExecutor()
.schedule(() -> checkForNewEntries(op, callback, ctx),
getConfig().getNewEntriesCheckDelayInMillis(), TimeUnit.MILLISECONDS);
// Check again for new entries after the configured time,
// then if still no entries are available register to be notified
if (getConfig().getNewEntriesCheckDelayInMillis() > 0) {
ledger.getScheduledExecutor()
.schedule(() -> checkForNewEntriesByMaxReadPosition(op, callback, ctx),
getConfig().getNewEntriesCheckDelayInMillis(), TimeUnit.MILLISECONDS);
} else {
// If there's no delay, check directly from the same thread
checkForNewEntriesByMaxReadPosition(op, callback, ctx);
}
} else {
// If there's no delay, check directly from the same thread
checkForNewEntries(op, callback, ctx);
// do not have available entries
// readPos > lastConfirmedEntry and readPos > maxReadPosition
if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) {
op.recycle();
callback.readEntriesFailed(new ManagedLedgerException.ConcurrentWaitCallbackException(), ctx);
return;
}

if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Deferring retry of read at position {}",
ledger.getName(), name, op.readPosition);
}

// Check again for new entries after the configured time,
// then if still no entries are available register to be notified
if (getConfig().getNewEntriesCheckDelayInMillis() > 0) {
ledger.getScheduledExecutor()
.schedule(() -> checkForNewEntries(op, callback, ctx),
getConfig().getNewEntriesCheckDelayInMillis(), TimeUnit.MILLISECONDS);
} else {
// If there's no delay, check directly from the same thread
checkForNewEntries(op, callback, ctx);
}
}
}
}
Expand Down Expand Up @@ -1108,6 +1146,63 @@ private void checkForNewEntries(OpReadEntry op, ReadEntriesCallback callback, Ob
}
}

private void checkForNewEntriesByMaxReadPosition(OpReadEntry op, ReadEntriesCallback callback, Object ctx) {
try {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Re-trying the read at readPosition {} maxPosition {}",
ledger.getName(), name, op.readPosition, op.maxPosition);
}

if (isClosed()) {
callback.readEntriesFailed(new CursorAlreadyClosedException("Cursor was already closed"), ctx);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to release ReadEntryOp here?

Copy link
Contributor Author

@TakaHiR07 TakaHiR07 Jun 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. need to recycle the op first. But this is better fixed by another pr.

return;
}

if (!hasMoreEntriesByMaxReadPosition()) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Still no entries available by maxReadPosition. Register for notification",
ledger.getName(), name);
}
ledger.addWaitingCursorByMaxReadPosition(this);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Skip notification registering since we do have entries "
+ "available by maxReadPosition",
ledger.getName(), name);
}
}

// Check again the entries count, since maxReadPosition may be changed between the time we
// checked and the time we've asked to be notified by managed ledger
if (hasMoreEntriesByMaxReadPosition()) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Found more entries by maxReadPosition", ledger.getName(), name);
}
// Try to cancel the notification request
if (WAITING_READ_OP_UPDATER_BY_MAX_READ_POSITION.compareAndSet(this, op, null)) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Cancelled notification by maxReadPosition and scheduled read at {}",
ledger.getName(), name, op.readPosition);
}
PENDING_READ_OPS_UPDATER.incrementAndGet(this);
ledger.asyncReadEntries(op);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] notification by maxReadPosition was already cancelled",
ledger.getName(), name);
}
}
} else if (ledger.isTerminated()) {
// At this point we registered for notification and still there were no more available
// entries by maxReadPosition.
// If the managed ledger was indeed terminated, we need to notify the cursor
callback.readEntriesFailed(new NoMoreEntriesToReadException("Topic was terminated"), ctx);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to release ReadEntryOp?

}
} catch (Throwable t) {
callback.readEntriesFailed(new ManagedLedgerException(t), ctx);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to release ReadEntryOp?

}
}

@Override
public boolean isClosed() {
return state == State.Closed || state == State.Closing;
Expand All @@ -1119,14 +1214,21 @@ public boolean cancelPendingReadRequest() {
log.debug("[{}] [{}] Cancel pending read request", ledger.getName(), name);
}
final OpReadEntry op = WAITING_READ_OP_UPDATER.getAndSet(this, null);
final OpReadEntry opByMaxReadPosition =
WAITING_READ_OP_UPDATER_BY_MAX_READ_POSITION.getAndSet(this, null);
if (op != null) {
op.recycle();
}
return op != null;
if (opByMaxReadPosition != null) {
opByMaxReadPosition.recycle();
}
// use || here because sometimes only exist one of the two waitingReadOp
return op != null || opByMaxReadPosition != null;
}

public boolean hasPendingReadRequest() {
return WAITING_READ_OP_UPDATER.get(this) != null;
return WAITING_READ_OP_UPDATER.get(this) != null
|| WAITING_READ_OP_UPDATER_BY_MAX_READ_POSITION.get(this) != null;
}

@Override
Expand All @@ -1148,6 +1250,22 @@ public boolean hasMoreEntries() {
}
}

private boolean hasMoreEntriesByMaxReadPosition() {
Position maxReadPosition = ledger.getMaxReadPosition();
if (maxReadPosition.getEntryId() != -1) {
return readPosition.compareTo(maxReadPosition) <= 0;
} else {
// Fall back to checking the number of entries to ensure we are at the last entry in ledger and no ledgers
// are in the middle
Position maxReadPositionNext = PositionFactory.create(maxReadPosition.getLedgerId(), 0);
if (readPosition.compareTo(maxReadPositionNext) > 0) {
return false;
} else {
return getNumberOfEntries(Range.closedOpen(readPosition, maxReadPositionNext)) > 0;
}
}
}

@Override
public long getNumberOfEntries() {
if (readPosition.compareTo(ledger.getLastPosition().getNext()) > 0) {
Expand Down Expand Up @@ -3441,6 +3559,40 @@ void notifyEntriesAvailable() {
}
}

/**
*
* @return Whether the cursor responded to the notification
*/
void notifyEntriesAvailableByMaxReadPosition() {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Received ml notification by maxReadPosition", ledger.getName(), name);
}
OpReadEntry opReadEntry = WAITING_READ_OP_UPDATER_BY_MAX_READ_POSITION.getAndSet(this, null);

if (opReadEntry != null) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Received notification of maxReadPosition change, reading at {} -- mPos: {}",
ledger.getName(), name, opReadEntry.readPosition, ledger.maxReadPosition);
log.debug("[{}] Consumer {} cursor notification: other counters: consumed {} mdPos {} rdPos {}",
ledger.getName(), name, messagesConsumedCounter, markDeletePosition, readPosition);
}

PENDING_READ_OPS_UPDATER.incrementAndGet(this);
opReadEntry.readPosition = getReadPosition();
// Only ml.maxReadPosition change would trigger notifyEntriesAvailableByMaxReadPosition.
// Also update op.maxPosition since ml.maxReadPosition has been changed.
// This can avoid that opReadEntry directly finish and request readEntry once again
opReadEntry.maxPosition = ledger.maxReadPosition;
ledger.asyncReadEntries(opReadEntry);
} else {
// No one is waiting to be notified. Ignore
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Received notification by maxReadPosition but had no pending read operation",
ledger.getName(), name);
}
}
}

void asyncCloseCursorLedger(final AsyncCallbacks.CloseCallback callback, final Object ctx) {
LedgerHandle lh = cursorLedger;
ledger.mbean.startCursorLedgerCloseOp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
// Cursors that are waiting to be notified when new entries are persisted
final ConcurrentLinkedQueue<ManagedCursorImpl> waitingCursors;

// Cursors that are waiting to be notified when new maxReadPosition is updated
final ConcurrentLinkedQueue<ManagedCursorImpl> waitingCursorsByMaxReadPosition;

// Objects that are waiting to be notified when new entries are persisted
final ConcurrentLinkedQueue<WaitingEntryCallBack> waitingEntryCallBacks;

Expand Down Expand Up @@ -246,6 +249,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
protected final Supplier<CompletableFuture<Boolean>> mlOwnershipChecker;

volatile Position lastConfirmedEntry;
volatile Position maxReadPosition = PositionFactory.LATEST;

protected ManagedLedgerInterceptor managedLedgerInterceptor;

Expand Down Expand Up @@ -372,6 +376,7 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper
}
this.entryCache = factory.getEntryCacheManager().getEntryCache(this);
this.waitingCursors = Queues.newConcurrentLinkedQueue();
this.waitingCursorsByMaxReadPosition = Queues.newConcurrentLinkedQueue();
this.waitingEntryCallBacks = Queues.newConcurrentLinkedQueue();
this.uninitializedCursors = new HashMap();
this.clock = config.getClock();
Expand Down Expand Up @@ -2416,6 +2421,17 @@ Position startReadOperationOnLedger(Position position) {
return position;
}

void notifyCursorsByMaxReadPositionChanged() {
while (true) {
final ManagedCursorImpl waitingCursor = waitingCursorsByMaxReadPosition.poll();
if (waitingCursor == null) {
break;
}

executor.execute(waitingCursor::notifyEntriesAvailableByMaxReadPosition);
}
}

void notifyCursors() {
while (true) {
final ManagedCursorImpl waitingCursor = waitingCursors.poll();
Expand Down Expand Up @@ -3768,6 +3784,10 @@ Position getLastPosition() {
return lastConfirmedEntry;
}

Position getMaxReadPosition() {
return maxReadPosition;
}

@Override
public ManagedCursor getSlowestConsumer() {
return cursors.getSlowestReader();
Expand Down Expand Up @@ -3842,12 +3862,17 @@ private void deactivateCursorByName(String cursorName) {

public void removeWaitingCursor(ManagedCursor cursor) {
this.waitingCursors.remove(cursor);
this.waitingCursorsByMaxReadPosition.remove(cursor);
}

public void addWaitingCursor(ManagedCursorImpl cursor) {
this.waitingCursors.add(cursor);
}

public void addWaitingCursorByMaxReadPosition(ManagedCursorImpl cursor) {
this.waitingCursorsByMaxReadPosition.add(cursor);
}

public boolean isCursorActive(ManagedCursor cursor) {
return activeCursors.get(cursor.getName()) != null;
}
Expand Down Expand Up @@ -3997,7 +4022,7 @@ public long getLastLedgerCreationFailureTimestamp() {
}

public int getWaitingCursorsCount() {
return waitingCursors.size();
return waitingCursors.size() + waitingCursorsByMaxReadPosition.size();
}

@Override
Expand Down Expand Up @@ -4628,4 +4653,13 @@ public CompletableFuture<Position> getLastDispatchablePosition(final Predicate<E
return ManagedLedgerImplUtils
.asyncGetLastValidPosition(this, predicate, startPosition);
}

public void updateMaxReadPosition(Position position) {
if (position != null) {
this.maxReadPosition = position;
// When maxReadPosition is updated, can notify the cursor
// waiting for maxReadPosition to update which can be read
this.notifyCursorsByMaxReadPositionChanged();
}
}
}
Loading
Loading