in web-console/src/views/datasources-view/datasources-view.tsx [420:671]
constructor(props: DatasourcesViewProps) {
super(props);
this.state = {
datasourcesAndDefaultRulesState: QueryState.INIT,
showUnused: false,
useUnuseAction: 'unuse',
useUnuseInterval: '',
showForceCompact: false,
visibleColumns: new LocalStorageBackedVisibility(
LocalStorageKeys.DATASOURCE_TABLE_COLUMN_SELECTION,
['Segment size', 'Segment granularity'],
),
actions: [],
};
this.datasourceQueryManager = new QueryManager<DatasourceQuery, DatasourcesAndDefaultRules>({
processQuery: async (
{ capabilities, visibleColumns, showUnused },
cancelToken,
setIntermediateQuery,
) => {
let datasources: DatasourceQueryResultRow[];
if (capabilities.hasSql()) {
const query = DatasourcesView.query(visibleColumns);
setIntermediateQuery(query);
datasources = await queryDruidSql({ query }, cancelToken);
} else if (capabilities.hasCoordinatorAccess()) {
const datasourcesResp = await getApiArray(
'/druid/coordinator/v1/datasources?simple',
cancelToken,
);
const loadstatusResp = await Api.instance.get('/druid/coordinator/v1/loadstatus?simple', {
cancelToken,
});
const loadstatus = loadstatusResp.data;
datasources = datasourcesResp.map((d: any): DatasourceQueryResultRow => {
const totalDataSize = deepGet(d, 'properties.segments.size') || -1;
const segmentsToLoad = Number(loadstatus[d.name] || 0);
const availableSegments = Number(deepGet(d, 'properties.segments.count'));
const numSegments = availableSegments + segmentsToLoad;
return {
datasource: d.name,
num_segments: numSegments,
num_zero_replica_segments: 0,
num_segments_to_load: segmentsToLoad,
num_segments_to_drop: 0,
minute_aligned_segments: -1,
hour_aligned_segments: -1,
day_aligned_segments: -1,
month_aligned_segments: -1,
year_aligned_segments: -1,
all_granularity_segments: -1,
replicated_size: -1,
total_data_size: totalDataSize,
min_segment_rows: -1,
avg_segment_rows: -1,
max_segment_rows: -1,
min_segment_size: -1,
avg_segment_size: -1,
max_segment_size: -1,
total_rows: -1,
avg_row_size: -1,
};
});
} else {
throw new Error(`must have SQL or coordinator access`);
}
const auxiliaryQueries: AuxiliaryQueryFn<DatasourcesAndDefaultRules>[] = [];
if (visibleColumns.shown('Running tasks')) {
if (capabilities.hasSql()) {
auxiliaryQueries.push(async (datasourcesAndDefaultRules, cancelToken) => {
try {
const runningTasks = await queryDruidSql<RunningTaskRow>(
{
query: DatasourcesView.RUNNING_TASK_SQL,
},
cancelToken,
);
const runningTasksByDatasource = groupByAsMap(
runningTasks,
x => x.datasource,
xs =>
groupByAsMap(
xs,
x => normalizeTaskType(x.type),
ys => sum(ys, y => y.num_running_tasks),
),
);
return {
...datasourcesAndDefaultRules,
datasources: datasourcesAndDefaultRules.datasources.map(ds => ({
...ds,
runningTasks: runningTasksByDatasource[ds.datasource] || {},
})),
};
} catch {
AppToaster.show({
icon: IconNames.ERROR,
intent: Intent.DANGER,
message: 'Could not get running task counts',
});
return datasourcesAndDefaultRules;
}
});
} else if (capabilities.hasOverlordAccess()) {
auxiliaryQueries.push(async (datasourcesAndDefaultRules, cancelToken) => {
try {
const taskList = await getApiArray(
`/druid/indexer/v1/tasks?state=running`,
cancelToken,
);
const runningTasksByDatasource = groupByAsMap(
taskList,
(t: any) => t.dataSource,
xs =>
groupByAsMap(
xs,
x => normalizeTaskType(x.type),
ys => ys.length,
),
);
return {
...datasourcesAndDefaultRules,
datasources: datasourcesAndDefaultRules.datasources.map(ds => ({
...ds,
runningTasks: runningTasksByDatasource[ds.datasource] || {},
})),
};
} catch {
AppToaster.show({
icon: IconNames.ERROR,
intent: Intent.DANGER,
message: 'Could not get running task counts',
});
return datasourcesAndDefaultRules;
}
});
}
}
let unused: string[] = [];
if (capabilities.hasCoordinatorAccess()) {
// Unused
const seen = countBy(datasources, x => x.datasource);
if (showUnused) {
try {
unused = (
await getApiArray<string>(
'/druid/coordinator/v1/metadata/datasources?includeUnused',
)
).filter(d => !seen[d]);
} catch {
AppToaster.show({
icon: IconNames.ERROR,
intent: Intent.DANGER,
message: 'Could not get the list of unused datasources',
});
}
}
// Rules
auxiliaryQueries.push(async (datasourcesAndDefaultRules, cancelToken) => {
try {
const rules = (
await Api.instance.get<Record<string, Rule[]>>('/druid/coordinator/v1/rules', {
cancelToken,
})
).data;
return {
datasources: datasourcesAndDefaultRules.datasources.map(ds => ({
...ds,
rules: rules[ds.datasource] || [],
})),
defaultRules: rules[RuleUtil.DEFAULT_RULES_KEY],
};
} catch {
AppToaster.show({
icon: IconNames.ERROR,
intent: Intent.DANGER,
message: 'Could not get load rules',
});
return datasourcesAndDefaultRules;
}
});
// Compaction
auxiliaryQueries.push(async (datasourcesAndDefaultRules, cancelToken) => {
try {
const compactionConfigsAndMore = (
await Api.instance.get<CompactionConfigs>(
'/druid/indexer/v1/compaction/config/datasources',
{ cancelToken },
)
).data;
const compactionConfigs = lookupBy(
compactionConfigsAndMore.compactionConfigs || [],
c => c.dataSource,
);
const compactionStatusesResp = await Api.instance.get<{
latestStatus: CompactionStatus[];
}>('/druid/indexer/v1/compaction/status/datasources', { cancelToken });
const compactionStatuses = lookupBy(
compactionStatusesResp.data.latestStatus || [],
c => c.dataSource,
);
return {
...datasourcesAndDefaultRules,
datasources: datasourcesAndDefaultRules.datasources.map(ds => ({
...ds,
compaction: {
config: compactionConfigs[ds.datasource],
status: compactionStatuses[ds.datasource],
},
})),
};
} catch {
AppToaster.show({
icon: IconNames.ERROR,
intent: Intent.DANGER,
message: 'Could not get compaction information',
});
return datasourcesAndDefaultRules;
}
});
}
return new ResultWithAuxiliaryWork(
{
datasources: datasources.concat(unused.map(makeUnusedDatasource)),
},
auxiliaryQueries,
);
},
onStateChange: datasourcesAndDefaultRulesState => {
this.setState({
datasourcesAndDefaultRulesState,
});
},
});
}