diff --git a/src/main/scala/org/apache/spark/sql/kinesis/KinesisReader.scala b/src/main/scala/org/apache/spark/sql/kinesis/KinesisReader.scala index da9120d..cc2527d 100644 --- a/src/main/scala/org/apache/spark/sql/kinesis/KinesisReader.scala +++ b/src/main/scala/org/apache/spark/sql/kinesis/KinesisReader.scala @@ -171,10 +171,9 @@ private[kinesis] case class KinesisReader( } private def listShards(): Seq[Shard] = { - var nextToken = "" - var returnedToken = "" + var nextToken: String = null val shards = new ArrayList[Shard]() - val listShardsRequest = new ListShardsRequest + var listShardsRequest = new ListShardsRequest listShardsRequest.setStreamName(streamName) listShardsRequest.setMaxResults(maxSupportedShardsPerStream) @@ -185,12 +184,14 @@ private[kinesis] case class KinesisReader( } } shards.addAll(listShardsResult.getShards) - returnedToken = listShardsResult.getNextToken() - if (returnedToken != null) { - nextToken = returnedToken + nextToken = listShardsResult.getNextToken() + if (nextToken != null) { + // Requests cannot contain both a stream name and a token + listShardsRequest = new ListShardsRequest listShardsRequest.setNextToken(nextToken) + listShardsRequest.setMaxResults(maxSupportedShardsPerStream) } - } while (!nextToken.isEmpty) + } while (nextToken != null) shards.asScala.toSeq }