in src/main/java/org/apache/accumulo/proxy/Proxy.java [177:262]
public static ServerAddress createProxyServer(HostAndPort address,
TProtocolFactory protocolFactory, Properties props) throws Exception {
final int numThreads = Integer
.parseInt(props.getProperty(THRIFT_THREAD_POOL_SIZE_KEY, THRIFT_THREAD_POOL_SIZE_DEFAULT));
final long maxFrameSize = ConfigurationTypeHelper.getFixedMemoryAsBytes(
props.getProperty(THRIFT_MAX_FRAME_SIZE_KEY, THRIFT_MAX_FRAME_SIZE_DEFAULT));
// No timeout
final long serverSocketTimeout = 0L;
// Use the new hadoop metrics2 support
final String serverName = "Proxy", threadName = "Accumulo Thrift Proxy";
// create the implementation of the proxy interface
ProxyServer impl = new ProxyServer(props);
// Wrap the implementation -- translate some exceptions
AccumuloProxy.Iface wrappedImpl = TraceUtil.wrapService(impl);
// Create the processor from the implementation
TProcessor processor = new AccumuloProxy.Processor<>(wrappedImpl);
// Get the type of thrift server to instantiate
final String serverTypeStr = props.getProperty(THRIFT_SERVER_TYPE, THRIFT_SERVER_TYPE_DEFAULT);
ThriftServerType serverType = DEFAULT_SERVER_TYPE;
if (!THRIFT_SERVER_TYPE_DEFAULT.equals(serverTypeStr)) {
serverType = ThriftServerType.get(serverTypeStr);
}
SslConnectionParams sslParams = null;
SaslServerConnectionParams saslParams = null;
switch (serverType) {
case SSL:
sslParams = SslConnectionParams.forClient(ClientConfConverter.toAccumuloConf(props));
break;
case SASL:
if (!ClientProperty.SASL_ENABLED.getBoolean(props)) {
throw new IllegalStateException("SASL thrift server was requested but 'sasl.enabled' is"
+ " not set to true in configuration");
}
// Kerberos needs to be enabled to use it
if (!UserGroupInformation.isSecurityEnabled()) {
throw new IllegalStateException("Hadoop security is not enabled");
}
// Login via principal and keytab
final String kerberosPrincipal = ClientProperty.AUTH_PRINCIPAL.getValue(props);
final AuthenticationToken authToken = ClientProperty.getAuthenticationToken(props);
if (!(authToken instanceof KerberosToken)) {
throw new IllegalStateException("Kerberos authentication must be used with SASL");
}
final KerberosToken kerberosToken = (KerberosToken) authToken;
final String kerberosKeytab = kerberosToken.getKeytab().getAbsolutePath();
if (StringUtils.isBlank(kerberosPrincipal) || StringUtils.isBlank(kerberosKeytab)) {
throw new IllegalStateException(
String.format("Kerberos principal '%s' and keytab '%s' must be provided",
kerberosPrincipal, kerberosKeytab));
}
UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytab);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
log.info("Logged in as {}", ugi.getUserName());
// The kerberosPrimary set in the SASL server needs to match the principal we're logged in
// as.
final String shortName = ugi.getShortUserName();
log.info("Setting server primary to {}", shortName);
props.setProperty(ClientProperty.SASL_KERBEROS_SERVER_PRIMARY.getKey(), shortName);
KerberosToken token = new KerberosToken();
saslParams = new SaslServerConnectionParams(props, token, null);
processor = new UGIAssumingProcessor(processor);
break;
default:
// nothing to do -- no extra configuration necessary
break;
}
TimedProcessor timedProcessor = new TimedProcessor(processor, serverName, threadName);
// Create the thrift server with our processor and properties
return TServerUtils.startTServer(serverType, timedProcessor, protocolFactory, serverName,
threadName, numThreads, ThreadPools.DEFAULT_TIMEOUT_MILLISECS,
ClientConfConverter.toAccumuloConf(props), 1000L, maxFrameSize, sslParams, saslParams,
serverSocketTimeout, address);
}