in samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java [63:141]
public ReadWriteUpdateTable getTable() {
Preconditions.checkNotNull(context, String.format("Table %s not initialized", tableId));
JavaTableConfig tableConfig = new JavaTableConfig(context.getJobContext().getConfig());
// Read part
TableReadFunction readFn = deserializeObject(tableConfig, RemoteTableDescriptor.READ_FN);
RateLimiter rateLimiter = deserializeObject(tableConfig, RemoteTableDescriptor.RATE_LIMITER);
if (rateLimiter != null) {
rateLimiter.init(this.context);
}
TableRateLimiter.CreditFunction<?, ?> readCreditFn = deserializeObject(tableConfig, RemoteTableDescriptor.READ_CREDIT_FN);
TableRateLimiter readRateLimiter = rateLimiter != null && rateLimiter.getSupportedTags().contains(RemoteTableDescriptor.RL_READ_TAG)
? new TableRateLimiter(tableId, rateLimiter, readCreditFn, RemoteTableDescriptor.RL_READ_TAG)
: null;
TableRetryPolicy readRetryPolicy = deserializeObject(tableConfig, RemoteTableDescriptor.READ_RETRY_POLICY);
// Write part
// Reuse write rate limiter for update
TableRateLimiter writeRateLimiter = null;
TableRetryPolicy writeRetryPolicy = null;
TableWriteFunction writeFn = deserializeObject(tableConfig, RemoteTableDescriptor.WRITE_FN);
if (writeFn != null) {
TableRateLimiter.CreditFunction<?, ?> writeCreditFn = deserializeObject(tableConfig, RemoteTableDescriptor.WRITE_CREDIT_FN);
writeRateLimiter = rateLimiter != null && rateLimiter.getSupportedTags().contains(RemoteTableDescriptor.RL_WRITE_TAG)
? new TableRateLimiter(tableId, rateLimiter, writeCreditFn, RemoteTableDescriptor.RL_WRITE_TAG)
: null;
writeRetryPolicy = deserializeObject(tableConfig, RemoteTableDescriptor.WRITE_RETRY_POLICY);
}
if (readRetryPolicy != null || writeRetryPolicy != null) {
retryExecutor = createRetryExecutor();
}
// Optional executor for future callback/completion. Shared by both read and write operations.
int callbackPoolSize = Integer.parseInt(tableConfig.getForTable(tableId, RemoteTableDescriptor.ASYNC_CALLBACK_POOL_SIZE, "-1"));
if (callbackPoolSize > 0) {
callbackExecutors.computeIfAbsent(tableId, (arg) ->
Executors.newFixedThreadPool(callbackPoolSize, (runnable) -> {
Thread thread = new Thread(runnable);
thread.setName("table-" + tableId + "-async-callback-pool");
thread.setDaemon(true);
return thread;
}));
}
boolean isRateLimited = readRateLimiter != null || writeRateLimiter != null;
if (isRateLimited) {
rateLimitingExecutors.computeIfAbsent(tableId, (arg) ->
Executors.newSingleThreadExecutor(runnable -> {
Thread thread = new Thread(runnable);
thread.setName("table-" + tableId + "-async-executor");
thread.setDaemon(true);
return thread;
}));
}
BatchProvider batchProvider = deserializeObject(tableConfig, RemoteTableDescriptor.BATCH_PROVIDER);
if (batchProvider != null) {
batchExecutors.computeIfAbsent(tableId, (arg) ->
Executors.newSingleThreadScheduledExecutor(runnable -> {
Thread thread = new Thread(runnable);
thread.setName("table-" + tableId + "-batch-scheduled-executor");
thread.setDaemon(true);
return thread;
}));
}
RemoteTable table = new RemoteTable(tableId,
readFn, writeFn,
readRateLimiter, writeRateLimiter, writeRateLimiter, rateLimitingExecutors.get(tableId),
readRetryPolicy, writeRetryPolicy, retryExecutor, batchProvider, batchExecutors.get(tableId),
callbackExecutors.get(tableId));
table.init(this.context);
tables.add(table);
return table;
}