Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -576,22 +576,21 @@ private[spark] class IndexShuffleBlockResolver(

private[shuffle] def getChecksums(checksumFile: File, blockNum: Int): Array[Long] = {
if (!checksumFile.exists()) return null
val checksums = new ArrayBuffer[Long]
// Read the checksums of blocks
var in: DataInputStream = null
try {
in = new DataInputStream(new NioBufferedFileInputStream(checksumFile))
while (checksums.size < blockNum) {
checksums += in.readLong()
Utils.tryWithResource {
new DataInputStream(new NioBufferedFileInputStream(checksumFile))
} { in =>
val checksums = new ArrayBuffer[Long]
while (checksums.size < blockNum) {
checksums += in.readLong()
}
checksums.toArray
}
} catch {
case _: IOException | _: EOFException =>
return null
} finally {
in.close()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor note: there is one subtle difference from the old code. The original finally { in.close() } would propagate an exception thrown by close() out of the method; the new closeQuietly swallows it. Just pointing it out for awareness, so we can discuss if we're sure that we want to proceed like this.

null
}

checksums.toArray
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,22 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite {
val checksumsFromFile = resolver.getChecksums(checksumFile, 10)
assert(checksumsInMemory === checksumsFromFile)
}

test("SPARK-57504: getChecksums returns null instead of throwing NPE when the " +
"checksum file cannot be opened") {
val resolver = new IndexShuffleBlockResolver(conf, blockManager)
// If opening the checksum file fails, getChecksums is meant to return null: it
// already catches IOException/EOFException. But the old non-null-safe
// `finally { in.close() }` threw NPE when the failure came from the stream
// constructor (`in` was still null), masking the original error. The sibling
// methods checkIndexAndDataFile and getMergedBlockData already handle this
// correctly. Force a constructor failure with a file that appears to exist but
// cannot be opened.
val unopenableChecksumFile = new File(tempDir, "missing.checksum") {
override def exists(): Boolean = true
}
assert(resolver.getChecksums(unopenableChecksumFile, 10) === null)
}
}

class SslIndexShuffleBlockResolverSuite extends IndexShuffleBlockResolverSuite {
Expand Down