in plc4j/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xBaseTable.java [131:175]
public Enumerable<Object[]> scan(DataContext root) {
return new AbstractEnumerable<>() {
@Override
public Enumerator<Object[]> enumerator() {
return new Enumerator<>() {
private final AtomicLong counter = new AtomicLong(0);
@Override
public Object[] current() {
List<Object> objects = new ArrayList<>(Arrays.asList(new Timestamp(current.timestamp.toEpochMilli()), current.source));
List<Object> objects2 = names.stream().map(name -> current.values.get(name)).collect(Collectors.toList());
objects.addAll(objects2);
return objects.toArray();
}
@Override
public boolean moveNext() {
try {
current = queue.take();
// If stream, simply return
if (tableCutoff <= 0L) {
return true;
}
// If table, return if below cutoff
return counter.getAndIncrement() < tableCutoff;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return false;
}
@Override
public void reset() {
counter.set(0);
}
@Override
public void close() {
// Unimplemented
}
};
}
};
}