in streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/milvus/MilvusSink.java [153:234]
public void onInvocation(SinkParams parameters,
EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
var extractor = parameters.extractor();
final String uri = extractor.singleValueParameter(MILVUS_URI_KEY, String.class);
final String token = extractor.singleValueParameter(MILVUS_TOKEN_KEY, String.class);
ConnectConfig connectConfig = ConnectConfig.builder()
.uri(uri)
.token(token)
.build();
PoolConfig poolConfig = PoolConfig.builder()
.maxIdlePerKey(10) // max idle clients per key
.maxTotalPerKey(20) // max total(idle + active) clients per key
.maxTotal(100) // max total clients for all keys
.maxBlockWaitDuration(Duration.ofSeconds(5L)) // getClient() will wait 5 seconds if no idle client available
.minEvictableIdleDuration(Duration.ofSeconds(10L)) // if number
.build();
try {
pool = new MilvusClientV2Pool(poolConfig, connectConfig);
client = pool.getClient("client_name");
//create a dataBase
final String dbName = parameters.extractor().singleValueParameter(MILVUS_DBNAME_KEY, String.class);
final String dbReplicaNum =
parameters.extractor().singleValueParameter(DATABASE_REPLICA_NUMBER_KEY, String.class);
Map<String, String> properties = new HashMap<>();
properties.put(Constant.DATABASE_REPLICA_NUMBER, dbReplicaNum);
ListDatabasesResp listDatabasesResp = client.listDatabases();
List<String> dbNames = listDatabasesResp.getDatabaseNames();
if (!dbNames.contains(dbName)) {
CreateDatabaseReq createDatabaseReq = CreateDatabaseReq.builder()
.databaseName(dbName)
.properties(properties)
.build();
client.createDatabase(createDatabaseReq);
client.useDatabase(dbName);
} else {
client.useDatabase(dbName);
}
this.vector = parameters.extractor().mappingPropertyValue(VECTOR_KEY).substring(4);
this.vectorDataType = INDEX_MAP.get(parameters.extractor().selectedSingleValue(INDEX, String.class));
this.primary = parameters.extractor().singleValueParameter(PRIMARY, String.class);
this.dimension = Integer.valueOf(parameters.extractor().singleValueParameter(DIMENSION, String.class));
this.metricType = METRIC_TYPE_MAP.get(parameters.extractor().selectedSingleValue(METRIC_TYPE, String.class));
this.collectionName = parameters.extractor().singleValueParameter(COLLECTION_NAME_KEY, String.class);
// check whether collection test exists
HasCollectionReq hasCollectionReq = HasCollectionReq.builder()
.collectionName(this.collectionName)
.build();
Boolean resp = client.hasCollection(hasCollectionReq);
if (resp) {
DescribeCollectionReq describeCollectionReq = DescribeCollectionReq.builder()
.collectionName(this.collectionName)
.build();
DescribeCollectionResp describeCollectionResp = client.describeCollection(describeCollectionReq);
if (!validateEventSchema(parameters.getModel().getInputStreams().get(0).getEventSchema().getEventProperties(),
"", describeCollectionResp.getCollectionSchema())){
throw new SpRuntimeException("The schema of the collection does not match the schema of the event stream");
}
} else {
// create a collection with schema, when indexParams is specified, it will create index as well
collectionSchema = client.createSchema();
EventSchema schema = parameters.getModel().getInputStreams().get(0).getEventSchema();
this.extractEventProperties(schema.getEventProperties(), "", collectionSchema);
indexParam = IndexParam.builder()
.fieldName(vector)
.metricType(metricType)
.build();
CreateCollectionReq createCollectionReq = CreateCollectionReq.builder()
.collectionName(collectionName)
.collectionSchema(collectionSchema)
.indexParams(Collections.singletonList(indexParam))
.build();
client.createCollection(createCollectionReq);
}
} catch (Exception e) {
throw new SpRuntimeException(e.getMessage());
}
}