in asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/AbstractCreateChannelStatement.java [244:325]
public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
IRequestParameters requestContext, MetadataProvider metadataProvider, int resultSetId)
throws HyracksDataException, AlgebricksException {
//This function performs three tasks:
//1. Create datasets for the Channel
//2. Create and run the Channel Job
//3. Create the metadata entry for the channel
//TODO: Figure out how to handle when a subset of the 3 tasks fails
dataverseName = statementExecutor.getActiveDataverseName(dataverseName);
subscriptionsTableName = channelName + BADConstants.subscriptionEnding;
resultsTableName = push ? "" : channelName + BADConstants.resultsEnding;
EntityId entityId = new EntityId(BADConstants.RUNTIME_ENTITY_CHANNEL, dataverseName, channelName.getValue());
ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
ActiveNotificationHandler activeEventHandler =
(ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
DeployedJobSpecEventListener listener = (DeployedJobSpecEventListener) activeEventHandler.getListener(entityId);
boolean alreadyActive = false;
Channel channel;
MetadataTransactionContext mdTxnCtx = null;
try {
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
channel = BADLangExtension.getChannel(mdTxnCtx, dataverseName, channelName.getValue());
if (channel != null) {
throw new AlgebricksException("A channel with this name " + channelName + " already exists.");
}
if (listener != null) {
alreadyActive = listener.isActive();
}
if (alreadyActive) {
throw new AsterixException("Channel " + channelName + " is already running");
}
initialize(statementExecutor, metadataProvider, mdTxnCtx);
//check if names are available before creating anything
if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, subscriptionsTableName) != null) {
throw new AsterixException("The channel name:" + channelName + " is not available.");
}
if (!push && MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, resultsTableName) != null) {
throw new AsterixException("The channel name:" + channelName + " is not available.");
}
MetadataProvider tempMdProvider = BADUtils.replicateMetadataProvider(metadataProvider);
tempMdProvider.setMaxResultReads(requestContext.getResultProperties().getMaxReads());
final IResultSet resultSet = requestContext.getResultSet();
final Stats stats = requestContext.getStats();
tempMdProvider.getConfig().put(BADConstants.CONFIG_CHANNEL_NAME, channelName.getValue());
//Create Channel Datasets
createDatasets(statementExecutor, tempMdProvider, hcc);
tempMdProvider.getLocks().reset();
//Create Channel Internal Job
JobSpecification channeljobSpec =
createChannelJob(statementExecutor, tempMdProvider, hcc, resultSet, stats);
// Now we subscribe
if (listener == null) {
listener = new DeployedJobSpecEventListener(appCtx, entityId,
push ? PrecompiledType.PUSH_CHANNEL : PrecompiledType.CHANNEL);
activeEventHandler.registerListener(listener);
}
BADJobService.setupExecutorJob(entityId, channeljobSpec, hcc, listener, metadataProvider.getTxnIdFactory(),
duration);
channel = new Channel(dataverseName, channelName.getValue(), subscriptionsTableName, resultsTableName,
function, duration, null, body);
MetadataManager.INSTANCE.addEntity(mdTxnCtx, channel);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
if (mdTxnCtx != null) {
QueryTranslator.abort(e, e, mdTxnCtx);
}
LOGGER.log(Level.WARNING, "Failed creating a channel", e);
throw HyracksDataException.create(e);
} finally {
metadataProvider.getLocks().unlock();
}
}