in gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiExtractor.java [104:169]
public void extractMetadata(String schema, String entity, WorkUnit workUnit) throws SchemaException {
log.info("Extract Metadata using Rest Api");
JsonArray columnArray = new JsonArray();
String inputQuery = workUnitState.getProp(ConfigurationKeys.SOURCE_QUERYBASED_QUERY);
List<String> columnListInQuery = null;
JsonArray array = null;
if (!Strings.isNullOrEmpty(inputQuery)) {
columnListInQuery = extractColumnListInQuery(inputQuery);
}
String excludedColumns = workUnitState.getProp(ConfigurationKeys.SOURCE_QUERYBASED_EXCLUDED_COLUMNS);
List<String> columnListExcluded = ImmutableList.<String> of();
if (Strings.isNullOrEmpty(inputQuery) && !Strings.isNullOrEmpty(excludedColumns)) {
Splitter splitter = Splitter.on(",").omitEmptyStrings().trimResults();
columnListExcluded = splitter.splitToList(excludedColumns.toLowerCase());
}
try {
boolean success = this.connector.connect();
if (!success) {
throw new SchemaException("Failed to connect.");
}
log.debug("Connected successfully.");
List<Command> cmds = this.getSchemaMetadata(schema, entity);
CommandOutput<?, ?> response = this.connector.getResponse(cmds);
array = this.getSchema(response);
for (JsonElement columnElement : array) {
Schema obj = GSON.fromJson(columnElement, Schema.class);
String columnName = obj.getColumnName();
obj.setWaterMark(this.isWatermarkColumn(workUnitState.getProp("extract.delta.fields"), columnName));
if (this.isWatermarkColumn(workUnitState.getProp("extract.delta.fields"), columnName)) {
obj.setNullable(false);
} else if (this.getPrimarykeyIndex(workUnitState.getProp("extract.primary.key.fields"), columnName) == 0) {
// set all columns as nullable except primary key and watermark columns
obj.setNullable(true);
}
obj.setPrimaryKey(this.getPrimarykeyIndex(workUnitState.getProp("extract.primary.key.fields"), columnName));
String jsonStr = GSON.toJson(obj);
JsonObject jsonObject = GSON.fromJson(jsonStr, JsonObject.class).getAsJsonObject();
// If input query is null or provided '*' in the query select all columns.
// Else, consider only the columns mentioned in the column list
if (inputQuery == null || columnListInQuery == null
|| (columnListInQuery.size() == 1 && columnListInQuery.get(0).equals("*"))
|| (columnListInQuery.size() >= 1 && this.isMetadataColumn(columnName, columnListInQuery))) {
if (!columnListExcluded.contains(columnName.trim().toLowerCase())) {
this.columnList.add(columnName);
columnArray.add(jsonObject);
}
}
}
this.updatedQuery = buildDataQuery(inputQuery, entity);
log.info("Schema:" + columnArray);
this.setOutputSchema(columnArray);
} catch (RuntimeException | RestApiProcessingException | RestApiConnectionException | IOException
| SchemaException e) {
throw new SchemaException("Failed to get schema using rest api; error - " + e.getMessage(), e);
}
}