in atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/Evaluator.java [143:205]
public record DataSources(Set<DataSource> sources) {
/** Create a new instance that is empty. */
public static DataSources empty() {
return new DataSources(Collections.emptySet());
}
/** Create a new instance. */
public static DataSources of(DataSource... sources) {
Set<DataSource> set = new HashSet<>(Arrays.asList(sources));
return new DataSources(set);
}
/** Create a new instance. */
public DataSources {
sources = Set.copyOf(sources);
}
/** Compares with another set and returns the new data sources that have been added. */
public Set<DataSource> addedSources(DataSources other) {
Set<DataSource> copy = new HashSet<>(sources);
copy.removeAll(other.sources());
return copy;
}
/** Compares with another set and returns the data sources that have been removed. */
public Set<DataSource> removedSources(DataSources other) {
return other.addedSources(this);
}
DataSources remoteOnly() {
Set<DataSource> remote = new HashSet<>(sources);
remote.removeAll(localSources());
return new DataSources(remote);
}
DataSources localOnly() {
return new DataSources(localSources());
}
private Set<DataSource> localSources() {
return sources.stream()
.filter(DataSource::isLocal)
.collect(Collectors.toSet());
}
/**
* Return the step size for the sources in this set. If there are mixed step sizes an
* IllegalStateException will be thrown. If the set is empty, then -1 will be returned.
*/
long stepSize() {
long step = -1L;
for (DataSource source : sources) {
long sourceStep = source.step().toMillis();
if (step != -1L && step != sourceStep) {
throw new IllegalStateException("inconsistent step sizes, expected "
+ step + ", found " + sourceStep + " on " + source);
}
step = sourceStep;
}
return step;
}
}