From 12cb6f687400df6facd0b6d1a1294cd775cee251 Mon Sep 17 00:00:00 2001 From: Anthony Huang Date: Tue, 19 Oct 2021 19:58:23 -0400 Subject: [PATCH] Fix listShards to only specify nextToken --- .../apache/spark/sql/kinesis/KinesisReader.scala | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/main/scala/org/apache/spark/sql/kinesis/KinesisReader.scala b/src/main/scala/org/apache/spark/sql/kinesis/KinesisReader.scala index da9120d..cc2527d 100644 --- a/src/main/scala/org/apache/spark/sql/kinesis/KinesisReader.scala +++ b/src/main/scala/org/apache/spark/sql/kinesis/KinesisReader.scala @@ -171,10 +171,9 @@ private[kinesis] case class KinesisReader( } private def listShards(): Seq[Shard] = { - var nextToken = "" - var returnedToken = "" + var nextToken: String = null val shards = new ArrayList[Shard]() - val listShardsRequest = new ListShardsRequest + var listShardsRequest = new ListShardsRequest listShardsRequest.setStreamName(streamName) listShardsRequest.setMaxResults(maxSupportedShardsPerStream) @@ -185,12 +184,14 @@ private[kinesis] case class KinesisReader( } } shards.addAll(listShardsResult.getShards) - returnedToken = listShardsResult.getNextToken() - if (returnedToken != null) { - nextToken = returnedToken + nextToken = listShardsResult.getNextToken() + if (nextToken != null) { + // Requests cannot contain both a stream name and a token + listShardsRequest = new ListShardsRequest listShardsRequest.setNextToken(nextToken) + listShardsRequest.setMaxResults(maxSupportedShardsPerStream) } - } while (!nextToken.isEmpty) + } while (nextToken != null) shards.asScala.toSeq }