public void onInvocation()

in streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/milvus/MilvusSink.java [153:234]


  public void onInvocation(SinkParams parameters,
                           EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
    var extractor = parameters.extractor();
    final String uri = extractor.singleValueParameter(MILVUS_URI_KEY, String.class);
    final String token = extractor.singleValueParameter(MILVUS_TOKEN_KEY, String.class);

    ConnectConfig connectConfig = ConnectConfig.builder()
        .uri(uri)
        .token(token)
        .build();

    PoolConfig poolConfig = PoolConfig.builder()
        .maxIdlePerKey(10) // max idle clients per key
        .maxTotalPerKey(20) // max total(idle + active) clients per key
        .maxTotal(100) // max total clients for all keys
        .maxBlockWaitDuration(Duration.ofSeconds(5L)) // getClient() will wait 5 seconds if no idle client available
        .minEvictableIdleDuration(Duration.ofSeconds(10L)) // if number
        .build();

    try {
      pool = new MilvusClientV2Pool(poolConfig, connectConfig);
      client = pool.getClient("client_name");
      //create a dataBase
      final String dbName = parameters.extractor().singleValueParameter(MILVUS_DBNAME_KEY, String.class);
      final String dbReplicaNum =
              parameters.extractor().singleValueParameter(DATABASE_REPLICA_NUMBER_KEY, String.class);
      Map<String, String> properties = new HashMap<>();
      properties.put(Constant.DATABASE_REPLICA_NUMBER, dbReplicaNum);
      ListDatabasesResp listDatabasesResp = client.listDatabases();
      List<String> dbNames = listDatabasesResp.getDatabaseNames();
      if (!dbNames.contains(dbName)) {
        CreateDatabaseReq createDatabaseReq = CreateDatabaseReq.builder()
            .databaseName(dbName)
            .properties(properties)
            .build();
        client.createDatabase(createDatabaseReq);
        client.useDatabase(dbName);
      } else {
        client.useDatabase(dbName);
      }

      this.vector = parameters.extractor().mappingPropertyValue(VECTOR_KEY).substring(4);
      this.vectorDataType = INDEX_MAP.get(parameters.extractor().selectedSingleValue(INDEX, String.class));
      this.primary = parameters.extractor().singleValueParameter(PRIMARY, String.class);
      this.dimension = Integer.valueOf(parameters.extractor().singleValueParameter(DIMENSION, String.class));
      this.metricType = METRIC_TYPE_MAP.get(parameters.extractor().selectedSingleValue(METRIC_TYPE, String.class));
      this.collectionName = parameters.extractor().singleValueParameter(COLLECTION_NAME_KEY, String.class);

      // check whether collection test exists
      HasCollectionReq hasCollectionReq = HasCollectionReq.builder()
          .collectionName(this.collectionName)
          .build();
      Boolean resp = client.hasCollection(hasCollectionReq);
      if (resp) {
        DescribeCollectionReq describeCollectionReq = DescribeCollectionReq.builder()
            .collectionName(this.collectionName)
            .build();
        DescribeCollectionResp describeCollectionResp = client.describeCollection(describeCollectionReq);
        if (!validateEventSchema(parameters.getModel().getInputStreams().get(0).getEventSchema().getEventProperties(),
                "", describeCollectionResp.getCollectionSchema())){
          throw new SpRuntimeException("The schema of the collection does not match the schema of the event stream");
        }
      } else {
        // create a collection with schema, when indexParams is specified, it will create index as well
        collectionSchema = client.createSchema();
        EventSchema schema = parameters.getModel().getInputStreams().get(0).getEventSchema();
        this.extractEventProperties(schema.getEventProperties(), "", collectionSchema);
        indexParam = IndexParam.builder()
            .fieldName(vector)
            .metricType(metricType)
            .build();
        CreateCollectionReq createCollectionReq = CreateCollectionReq.builder()
            .collectionName(collectionName)
            .collectionSchema(collectionSchema)
            .indexParams(Collections.singletonList(indexParam))
            .build();
        client.createCollection(createCollectionReq);
      }
    } catch (Exception e) {
      throw new SpRuntimeException(e.getMessage());
    }
  }