public ReadWriteUpdateTable getTable()

in samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java [63:141]


  public ReadWriteUpdateTable getTable() {

    Preconditions.checkNotNull(context, String.format("Table %s not initialized", tableId));

    JavaTableConfig tableConfig = new JavaTableConfig(context.getJobContext().getConfig());

    // Read part
    TableReadFunction readFn = deserializeObject(tableConfig, RemoteTableDescriptor.READ_FN);
    RateLimiter rateLimiter = deserializeObject(tableConfig, RemoteTableDescriptor.RATE_LIMITER);
    if (rateLimiter != null) {
      rateLimiter.init(this.context);
    }

    TableRateLimiter.CreditFunction<?, ?> readCreditFn = deserializeObject(tableConfig, RemoteTableDescriptor.READ_CREDIT_FN);
    TableRateLimiter readRateLimiter = rateLimiter != null && rateLimiter.getSupportedTags().contains(RemoteTableDescriptor.RL_READ_TAG)
        ? new TableRateLimiter(tableId, rateLimiter, readCreditFn, RemoteTableDescriptor.RL_READ_TAG)
        : null;
    TableRetryPolicy readRetryPolicy = deserializeObject(tableConfig, RemoteTableDescriptor.READ_RETRY_POLICY);

    // Write part
    // Reuse write rate limiter for update
    TableRateLimiter writeRateLimiter = null;
    TableRetryPolicy writeRetryPolicy = null;
    TableWriteFunction writeFn = deserializeObject(tableConfig, RemoteTableDescriptor.WRITE_FN);
    if (writeFn != null) {
      TableRateLimiter.CreditFunction<?, ?> writeCreditFn = deserializeObject(tableConfig, RemoteTableDescriptor.WRITE_CREDIT_FN);
      writeRateLimiter = rateLimiter != null && rateLimiter.getSupportedTags().contains(RemoteTableDescriptor.RL_WRITE_TAG)
          ? new TableRateLimiter(tableId, rateLimiter, writeCreditFn, RemoteTableDescriptor.RL_WRITE_TAG)
          : null;
      writeRetryPolicy = deserializeObject(tableConfig, RemoteTableDescriptor.WRITE_RETRY_POLICY);
    }

    if (readRetryPolicy != null || writeRetryPolicy != null) {
      retryExecutor = createRetryExecutor();
    }

    // Optional executor for future callback/completion. Shared by both read and write operations.
    int callbackPoolSize = Integer.parseInt(tableConfig.getForTable(tableId, RemoteTableDescriptor.ASYNC_CALLBACK_POOL_SIZE, "-1"));
    if (callbackPoolSize > 0) {
      callbackExecutors.computeIfAbsent(tableId, (arg) ->
          Executors.newFixedThreadPool(callbackPoolSize, (runnable) -> {
            Thread thread = new Thread(runnable);
            thread.setName("table-" + tableId + "-async-callback-pool");
            thread.setDaemon(true);
            return thread;
          }));
    }

    boolean isRateLimited = readRateLimiter != null || writeRateLimiter != null;
    if (isRateLimited) {
      rateLimitingExecutors.computeIfAbsent(tableId, (arg) ->
          Executors.newSingleThreadExecutor(runnable -> {
            Thread thread = new Thread(runnable);
            thread.setName("table-" + tableId + "-async-executor");
            thread.setDaemon(true);
            return thread;
          }));
    }

    BatchProvider batchProvider = deserializeObject(tableConfig, RemoteTableDescriptor.BATCH_PROVIDER);
    if (batchProvider != null) {
      batchExecutors.computeIfAbsent(tableId, (arg) ->
          Executors.newSingleThreadScheduledExecutor(runnable -> {
            Thread thread = new Thread(runnable);
            thread.setName("table-" + tableId + "-batch-scheduled-executor");
            thread.setDaemon(true);
            return thread;
          }));
    }

    RemoteTable table = new RemoteTable(tableId,
        readFn, writeFn,
        readRateLimiter, writeRateLimiter, writeRateLimiter, rateLimitingExecutors.get(tableId),
        readRetryPolicy, writeRetryPolicy, retryExecutor, batchProvider, batchExecutors.get(tableId),
        callbackExecutors.get(tableId));
    table.init(this.context);
    tables.add(table);
    return table;
  }