private synchronized void syncShardLeases()

in src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/DynamoDBStreamsShardSyncer.java [117:164]


    private synchronized void syncShardLeases(IKinesisProxy kinesisProxy,
        ILeaseManager<KinesisClientLease> leaseManager,
        InitialPositionInStreamExtended initialPosition,
        boolean cleanupLeasesOfCompletedShards,
        boolean ignoreUnexpectedChildShards)
        throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
        LOG.info("syncShardLeases: begin");
        List<Shard> shards = getShardList(kinesisProxy);
        LOG.debug("Num shards: " + shards.size());

        Map<String, Shard> shardIdToShardMap = constructShardIdToShardMap(shards);
        Map<String, Set<String>> shardIdToChildShardIdsMap = constructShardIdToChildShardIdsMap(shardIdToShardMap);
        Set<String> inconsistentShardIds = findInconsistentShardIds(shardIdToChildShardIdsMap, shardIdToShardMap);
        if (!ignoreUnexpectedChildShards) {
            assertAllParentShardsAreClosed(inconsistentShardIds);
        }

        List<KinesisClientLease> currentLeases = leaseManager.listLeases();

        List<KinesisClientLease> newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition,
            inconsistentShardIds);
        LOG.debug("Num new leases to create: " + newLeasesToCreate.size());
        for (KinesisClientLease lease : newLeasesToCreate) {
            long startTimeMillis = System.currentTimeMillis();
            boolean success = false;
            try {
                leaseManager.createLeaseIfNotExists(lease);
                success = true;
            } finally {
                MetricsHelper.addSuccessAndLatency("CreateLease", startTimeMillis, success, MetricsLevel.DETAILED);
            }
        }

        List<KinesisClientLease> trackedLeases = new ArrayList<>();
        if (currentLeases != null) {
            trackedLeases.addAll(currentLeases);
        }
        trackedLeases.addAll(newLeasesToCreate);
        cleanupGarbageLeases(shards, trackedLeases, kinesisProxy, leaseManager);
        if (cleanupLeasesOfCompletedShards) {
            cleanupLeasesOfFinishedShards(currentLeases,
                shardIdToShardMap,
                shardIdToChildShardIdsMap,
                trackedLeases,
                leaseManager);
        }
        LOG.info("syncShardLeases: done");
    }