Skip to content
Open
9 changes: 9 additions & 0 deletions .agents/references/testing-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ description: Testing frameworks, conventions, and commands for the MongoDB Java
- Descriptive method names: `shouldReturnEmptyListWhenNoDocumentsMatch()` not `test1()`
- Use `@DisplayName` for human-readable names
- Clean up test data in `@AfterEach` / `cleanup()` to prevent pollution
- Import types at the top of the file — do not use inline fully-qualified names
(`com.mongodb.connection.AsyncCompletionHandler`, `java.util.List`) in signatures or bodies
Comment thread
rozza marked this conversation as resolved.
- Use `AtomicInteger` / `AtomicBoolean` for mutable state captured by lambdas or anonymous
classes — not single-element array wrappers (`int[]`, `boolean[]`)
- Assert the most specific exception type the code guarantees
(`assertInstanceOf(MongoCommandException.class, e)`, not `Exception.class`)
- When testing that a side effect happens on condition X (e.g. connection closed on stream
desync), also test that it does NOT happen on the neighbouring condition Y (e.g. connection
stays open on a command error) — one-sided tests let behavioral regressions through

## Running Tests

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import com.mongodb.observability.micrometer.MongodbObservationContext;
import org.bson.BsonBinaryReader;
import org.bson.BsonDocument;
import org.bson.BsonSerializationException;
import org.bson.ByteBuf;
import org.bson.codecs.BsonDocumentCodec;
import org.bson.codecs.Decoder;
Expand Down Expand Up @@ -571,8 +572,8 @@ private <T> T receiveCommandMessageResponse(final Decoder<T> decoder, final Comm
new BsonDocumentCodec()), description.getServerAddress(), operationContext.getTimeoutContext());
}

commandSuccessful = true;
commandEventSender.sendSucceededEvent(responseBuffers);
commandSuccessful = true;

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why sendSucceededEvent(...) comes before commandSuccessful = true

commandSuccessful is the guard that prevents emitting both a succeeded and a failed monitoring event for a single command:

commandEventSender.sendSucceededEvent(responseBuffers);
commandSuccessful = true;
...
} catch (Throwable t) {
    if (!connectionIsReusable(t)) {
        close();
    }
    if (!commandSuccessful) {
        commandEventSender.sendFailedEvent(t);   // only if we never signalled success
    }
    ...
    throw t;
}

The subtlety is what happens when sendSucceededEvent itself throws. It isn't a no-op — it does real work before the listener is notified:

public void sendSucceededEvent(final ResponseBuffers responseBuffers) {
    sendSucceededEvent(responseBuffers.getResponseDocument(message.getId(), new RawBsonDocumentCodec()));
}

getResponseDocument(message.getId(), ...) builds a ReplyMessage, which validates responseTo against the request id and decodes the body. On a desync (responseTo mismatch) it throws MongoInternalException; on a corrupt body it throws BsonSerializationException — both while constructing the event, before commandListener.commandSucceeded(...) is ever called.

So with the current order:

  • sendSucceededEvent throws → commandSuccessful stays false → the catch sends a failed event. Correct: a desync/corrupt response should surface as a failure, and the listener was never told it succeeded.

If the two lines were swapped (the old order):

  • sendSucceededEvent throws → commandSuccessful is already true → the catch suppresses the failed event → monitoring gets a "started" with no terminal event. That's the bug this ordering avoids.

There's no double-event risk on the normal path either: if the listener itself throws, ProtocolHelper.sendCommandSucceededEvent wraps commandListener.commandSucceeded(...) in try/catch (Exception) and swallows it, so sendSucceededEvent returns normally and the catch is never entered.

This is covered by syncSendsFailedEventOnResponseToMismatchInOkResponse (and its async equivalent) and syncSendsFailedEventWhenResponseDocumentParsingFails, which assert exactly one started + one failed event.


T commandResult = getCommandResult(decoder, responseBuffers, responseTo, operationContext.getTimeoutContext());
hasMoreToCome = responseBuffers.getReplyHeader().hasMoreToCome();
Expand All @@ -583,20 +584,18 @@ private <T> T receiveCommandMessageResponse(final Decoder<T> decoder, final Comm
}

return commandResult;
} catch (Exception e) {
if (!commandSuccessful) {
commandEventSender.sendFailedEvent(e);
}
} catch (Throwable t) {
onCommandFailure(t, commandSuccessful, commandEventSender);
if (tracingSpan != null) {
if (e instanceof MongoCommandException) {
if (t instanceof MongoCommandException) {
MongodbObservationContext ctx = tracingSpan.getMongodbObservationContext();
if (ctx != null) {
ctx.setResponseStatusCode(String.valueOf(((MongoCommandException) e).getErrorCode()));
ctx.setResponseStatusCode(String.valueOf(((MongoCommandException) t).getErrorCode()));
}
}
tracingSpan.error(e);
tracingSpan.error(t);
}
throw e;
throw t;
Comment thread
rozza marked this conversation as resolved.
} finally {
if (tracingSpan != null) {
tracingSpan.closeScope();
Expand All @@ -605,6 +604,42 @@ private <T> T receiveCommandMessageResponse(final Decoder<T> decoder, final Comm
}
}

/**
* Determines whether a failure raised while processing a command response leaves the connection
* reusable. These exception types are only raised after the full framed response has been read off
* the wire, so the stream remains synchronized and the connection can be returned to the pool:
* <ul>
* <li>{@link MongoCommandException} — an {@code ok: 0} error response (and subclasses)</li>
* <li>{@link MongoWriteConcernWithResponseException} — a write concern error carrying the response</li>
* <li>{@link MongoOperationTimeoutException} — a write concern timeout, or a server-side
* {@code MaxTimeMSExpired} ({@code ok: 0}) timeout; both are derived from the response body</li>
* <li>{@link BsonSerializationException} — a corrupt BSON body whose exact byte count was still consumed</li>
* </ul>
* Any other failure (e.g. a responseTo mismatch or an unexpected error) may have left the stream
* desynchronized, so the connection must be closed to prevent pool reuse.
*/
private static boolean connectionIsReusable(final Throwable failure) {
return failure instanceof MongoCommandException
|| failure instanceof MongoWriteConcernWithResponseException
|| failure instanceof MongoOperationTimeoutException
|| failure instanceof BsonSerializationException;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we expand BsonSerializationException to BSONException?

}

/**
* Handles a failure raised while sending or processing a command, identically on the sync and async paths:
* closes the connection unless the failure leaves it {@linkplain #connectionIsReusable reusable}, and emits a
* command-failed event unless a succeeded event has already been sent.
*/
private void onCommandFailure(final Throwable failure, final boolean commandSuccessful,
final CommandEventSender commandEventSender) {
if (!connectionIsReusable(failure)) {
close();

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If close throws we're not sending the failed event ... maybe add a suppressed exception to make it resilient ?

try {
          if (!connectionIsReusable(failure)) {
              close();
          }
      } catch (Throwable closeFailure) {
          failure.addSuppressed(closeFailure)
      }
      if (!commandSuccessful) {
          sender.sendFailedEvent(failure);
      }

}
if (!commandSuccessful) {
commandEventSender.sendFailedEvent(failure);
}
}

private <T> void sendAndReceiveAsyncInternal(final CommandMessage message, final Decoder<T> decoder,
final OperationContext operationContext, final SingleResultCallback<T> callback) {
if (isClosed()) {
Expand Down Expand Up @@ -730,28 +765,33 @@ private <T> void sendCommandMessageAsync(final int messageId, final Decoder<T> d
return;
}
assertNotNull(responseBuffers);
T commandResult;
T commandResult = null;
boolean commandSuccessful = false;
Throwable failure = null;
try {
updateSessionContext(operationContext.getSessionContext(), responseBuffers);
boolean commandOk =
isCommandOk(new BsonBinaryReader(new ByteBufferBsonInput(responseBuffers.getBodyByteBuffer())));
responseBuffers.reset();
if (!commandOk) {
MongoException commandFailureException = getCommandFailureException(
throw getCommandFailureException(
responseBuffers.getResponseDocument(messageId, new BsonDocumentCodec()),
description.getServerAddress(), operationContext.getTimeoutContext());
commandEventSender.sendFailedEvent(commandFailureException);
throw commandFailureException;
}
commandEventSender.sendSucceededEvent(responseBuffers);
commandSuccessful = true;

commandResult = getCommandResult(decoder, responseBuffers, messageId, operationContext.getTimeoutContext());
} catch (Throwable localThrowable) {
callback.onResult(null, localThrowable);
return;
failure = localThrowable;
} finally {
responseBuffers.close();
}
if (failure != null) {
onCommandFailure(failure, commandSuccessful, commandEventSender);
callback.onResult(null, failure);
return;
}
callback.onResult(commandResult, null);
}));
}
Expand Down Expand Up @@ -783,9 +823,9 @@ public void sendMessage(final List<ByteBuf> byteBuffers, final int lastRequestId
}
try {
stream.write(byteBuffers, operationContext);
} catch (Exception e) {
} catch (Throwable t) {
close();
throwTranslatedWriteException(e, operationContext);
throwTranslatedWriteException(t, operationContext);
}
}

Expand All @@ -803,10 +843,10 @@ public void sendMessageAsync(
c.complete(c);
}).thenRunTryCatchAsyncBlocks(c -> {
stream.writeAsync(byteBuffers, operationContext, c.asHandler());
}, Exception.class, (e, c) -> {
}, Throwable.class, (t, c) -> {
try {
close();
throwTranslatedWriteException(e, operationContext);
throwTranslatedWriteException(t, operationContext);
} catch (Throwable translatedException) {
c.completeExceptionally(translatedException);
}
Expand Down Expand Up @@ -859,12 +899,12 @@ public void completed(@Nullable final ByteBuf buffer) {
@Override
public void failed(final Throwable t) {
close();
callback.onResult(null, translateReadException(t, operationContext));
callback.onResult(null, translateReadFailure(t, operationContext));
}
});
} catch (Exception e) {
} catch (Throwable t) {
close();
callback.onResult(null, translateReadException(e, operationContext));
callback.onResult(null, translateReadFailure(t, operationContext));
}
}

Expand All @@ -888,7 +928,19 @@ private void updateSessionContext(final SessionContext sessionContext, final Res
}
}

/**
* Rethrows a fatal JVM {@link Error} (e.g. {@link OutOfMemoryError}) unchanged, so it is never downgraded to a
* catchable {@link MongoException}. Used by the paths that propagate a failure by throwing; the async read path
* delivers the failure as a callback value instead and uses {@link #translateReadFailure} for the same purpose.
*/
private static void rethrowIfError(final Throwable t) {
if (t instanceof Error) {
throw (Error) t;
}
}

private void throwTranslatedWriteException(final Throwable e, final OperationContext operationContext) {
rethrowIfError(e);
if (e instanceof MongoSocketWriteTimeoutException && operationContext.getTimeoutContext().hasTimeoutMS()) {
throw createMongoTimeoutException(e);
}
Expand All @@ -906,6 +958,18 @@ private void throwTranslatedWriteException(final Throwable e, final OperationCon
}
}

/**
* Translates a read failure for delivery to an async callback. {@link Error}s are passed through unchanged
* rather than wrapped in a {@link MongoException}, so a fatal JVM error (e.g. {@link OutOfMemoryError}) is not
* downgraded to a catchable exception. The sync read path uses {@link #rethrowIfError} for the same purpose.
*/
private Throwable translateReadFailure(final Throwable e, final OperationContext operationContext) {
if (e instanceof Error) {
return e;
}
return translateReadException(e, operationContext);
}

private MongoException translateReadException(final Throwable e, final OperationContext operationContext) {
if (operationContext.getTimeoutContext().hasTimeoutMS()) {
if (e instanceof SocketTimeoutException) {
Expand Down Expand Up @@ -938,6 +1002,9 @@ private MongoSocketReadTimeoutException createReadTimeoutException(final Socket
}

private ResponseBuffers receiveResponseBuffers(final OperationContext operationContext) {
// The uncompressed buffer is allocated by us (not handed to ResponseBuffers until the last
// statement of the compressed branch), so it must be released if anything fails beforehand.
ByteBuf uncompressedBuffer = null;
try {
ByteBuf messageHeaderBuffer = stream.read(MESSAGE_HEADER_LENGTH, operationContext);
MessageHeader messageHeader;
Expand All @@ -955,11 +1022,11 @@ private ResponseBuffers receiveResponseBuffers(final OperationContext operationC

Compressor compressor = getCompressor(compressedHeader);

ByteBuf buffer = getBuffer(compressedHeader.getUncompressedSize());
compressor.uncompress(messageBuffer, buffer);
uncompressedBuffer = getBuffer(compressedHeader.getUncompressedSize());
compressor.uncompress(messageBuffer, uncompressedBuffer);

buffer.flip();
return new ResponseBuffers(new ReplyHeader(buffer, compressedHeader), buffer);
uncompressedBuffer.flip();
return new ResponseBuffers(new ReplyHeader(uncompressedBuffer, compressedHeader), uncompressedBuffer);
} else {
ResponseBuffers responseBuffers = new ResponseBuffers(new ReplyHeader(messageBuffer, messageHeader), messageBuffer);
releaseMessageBuffer = false;
Expand All @@ -971,7 +1038,11 @@ private ResponseBuffers receiveResponseBuffers(final OperationContext operationC
}
}
} catch (Throwable t) {
if (uncompressedBuffer != null) {
uncompressedBuffer.release();
}
close();
rethrowIfError(t);
throw translateReadException(t, operationContext);
}
}
Expand Down Expand Up @@ -1005,18 +1076,25 @@ public void onResult(@Nullable final ByteBuf result, @Nullable final Throwable t
callback.onResult(null, t);
return;
}
MessageHeader messageHeader = null;
Throwable headerParsingFailure = null;
try {
assertNotNull(result);
MessageHeader messageHeader = new MessageHeader(result, description.getMaxMessageSize());
readAsync(messageHeader.getMessageLength() - MESSAGE_HEADER_LENGTH, operationContext,
new MessageCallback(messageHeader));
messageHeader = new MessageHeader(result, description.getMaxMessageSize());
} catch (Throwable localThrowable) {
callback.onResult(null, localThrowable);
headerParsingFailure = localThrowable;
} finally {
if (result != null) {
result.release();
}
}
if (headerParsingFailure != null) {
close();
callback.onResult(null, headerParsingFailure);
return;
}
readAsync(messageHeader.getMessageLength() - MESSAGE_HEADER_LENGTH, operationContext,
new MessageCallback(messageHeader));
}

private class MessageCallback implements SingleResultCallback<ByteBuf> {
Expand All @@ -1034,19 +1112,24 @@ public void onResult(@Nullable final ByteBuf result, @Nullable final Throwable t
}
boolean releaseResult = true;
assertNotNull(result);
ResponseBuffers responseBuffers = null;
Throwable bodyParsingFailure = null;
// The uncompressed buffer is allocated by us and is not handed to ResponseBuffers until the
// last statement of the try, so it must be released if anything fails beforehand.
ByteBuf uncompressedBuffer = null;
try {
ReplyHeader replyHeader;
ByteBuf responseBuffer;
if (messageHeader.getOpCode() == OP_COMPRESSED.getValue()) {
try {
CompressedHeader compressedHeader = new CompressedHeader(result, messageHeader);
Compressor compressor = getCompressor(compressedHeader);
ByteBuf buffer = getBuffer(compressedHeader.getUncompressedSize());
compressor.uncompress(result, buffer);
uncompressedBuffer = getBuffer(compressedHeader.getUncompressedSize());
compressor.uncompress(result, uncompressedBuffer);

buffer.flip();
replyHeader = new ReplyHeader(buffer, compressedHeader);
responseBuffer = buffer;
uncompressedBuffer.flip();
replyHeader = new ReplyHeader(uncompressedBuffer, compressedHeader);
responseBuffer = uncompressedBuffer;
} finally {
releaseResult = false;
result.release();
Expand All @@ -1056,14 +1139,25 @@ public void onResult(@Nullable final ByteBuf result, @Nullable final Throwable t
responseBuffer = result;
releaseResult = false;
}
callback.onResult(new ResponseBuffers(replyHeader, responseBuffer), null);
// Must be the last statement in the try: ResponseBuffers now owns responseBuffer, so
// nothing that could throw may follow it, or the buffer would leak (it is not released below).
responseBuffers = new ResponseBuffers(replyHeader, responseBuffer);
} catch (Throwable localThrowable) {
Comment thread
rozza marked this conversation as resolved.
callback.onResult(null, localThrowable);
if (uncompressedBuffer != null) {
uncompressedBuffer.release();
}
bodyParsingFailure = localThrowable;
} finally {
if (releaseResult) {
result.release();
}
}
if (bodyParsingFailure != null) {
close();
callback.onResult(null, bodyParsingFailure);
return;
}
callback.onResult(responseBuffers, null);
}
}
}
Expand Down
Loading