public void open()

in pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java [76:153]


    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
        log.info("Open MongoDB Source");

        mongoConfig = MongoConfig.load(config);
        mongoConfig.validate(false, false);

        if (clientProvider != null) {
            mongoClient = clientProvider.get();
        } else {
            mongoClient = MongoClients.create(mongoConfig.getMongoUri());
        }

        if (StringUtils.isEmpty(mongoConfig.getDatabase())) {
            // Watch all databases
            log.info("Watch all");
            stream = mongoClient.watch();

        } else {
            final MongoDatabase db = mongoClient.getDatabase(mongoConfig.getDatabase());

            if (StringUtils.isEmpty(mongoConfig.getCollection())) {
                // Watch all collections in a database
                log.info("Watch db: {}", db.getName());
                stream = db.watch();

            } else {
                // Watch a collection

                final MongoCollection<Document> collection = db.getCollection(mongoConfig.getCollection());
                log.info("Watch collection: {} {}", db.getName(), mongoConfig.getCollection());
                stream = collection.watch();
            }
        }

        stream.batchSize(mongoConfig.getBatchSize()).fullDocument(FullDocument.UPDATE_LOOKUP);

        stream.subscribe(new Subscriber<ChangeStreamDocument<Document>>() {
            private ObjectMapper mapper = new ObjectMapper();
            private Subscription subscription;

            @Override
            public void onSubscribe(Subscription subscription) {
                this.subscription = subscription;
                this.subscription.request(Integer.MAX_VALUE);
            }

            @Override
            public void onNext(ChangeStreamDocument<Document> doc) {
                try {
                    log.info("New change doc: {}", doc);

                    // Build a record with the essential information
                    final Map<String, Object> recordValue = new HashMap<>();
                    recordValue.put("fullDocument", doc.getFullDocument());
                    recordValue.put("ns", doc.getNamespace());
                    recordValue.put("operation", doc.getOperationType());

                    consume(new DocRecord(
                            Optional.of(doc.getDocumentKey().toJson()),
                            mapper.writeValueAsString(recordValue).getBytes(StandardCharsets.UTF_8)));

                } catch (JsonProcessingException e) {
                    log.error("Processing doc from mongo", e);
                }
            }

            @Override
            public void onError(Throwable error) {
                log.error("Subscriber error", error);
            }

            @Override
            public void onComplete() {
                log.info("Subscriber complete");
            }
        });

    }