in asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslator.java [316:384]
public void handleCreateIndexStatement(MetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception {
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
//Allow channels to use the new index
DataverseName dvId = getActiveDataverseName(((CreateIndexStatement) stmt).getDataverseName());
String dsId = ((CreateIndexStatement) stmt).getDatasetName().getValue();
Pair<List<Channel>, List<Procedure>> usages = checkIfDatasetIsInUse(mdTxnCtx, dvId, dsId, true);
List<Dataverse> dataverseList = MetadataManager.INSTANCE.getDataverses(mdTxnCtx);
for (Dataverse dv : dataverseList) {
List<Function> functions = MetadataManager.INSTANCE.getDataverseFunctions(mdTxnCtx, dv.getDataverseName());
for (Function function : functions) {
for (Triple<DataverseName, String, String> datasetDependency : function.getDependencies().get(0)) {
if (datasetDependency.first.equals(dvId) && datasetDependency.second.equals(dsId)) {
Pair<List<Channel>, List<Procedure>> functionUsages =
checkIfFunctionIsInUse(mdTxnCtx, function.getDataverseName(), function.getName(),
Integer.toString(function.getArity()), true);
for (Channel channel : functionUsages.first) {
if (!usages.first.contains(channel)) {
usages.first.add(channel);
}
}
for (Procedure procedure : functionUsages.second) {
if (!usages.second.contains(procedure)) {
usages.second.add(procedure);
}
}
}
}
}
}
ActiveNotificationHandler activeEventHandler =
(ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
for (Channel channel : usages.first) {
DeployedJobSpecEventListener listener =
(DeployedJobSpecEventListener) activeEventHandler.getListener(channel.getChannelId());
listener.suspend();
}
for (Procedure procedure : usages.second) {
DeployedJobSpecEventListener listener =
(DeployedJobSpecEventListener) activeEventHandler.getListener(procedure.getEntityId());
listener.suspend();
}
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
metadataProvider.getLocks().unlock();
metadataProvider = MetadataProvider.create(appCtx, activeDataverse);
super.handleCreateIndexStatement(metadataProvider, stmt, hcc, requestParameters);
for (Channel channel : usages.first) {
metadataProvider = MetadataProvider.create(appCtx, activeDataverse);
BADJobService.redeployJobSpec(channel.getChannelId(), channel.getChannelBody(), metadataProvider, this, hcc,
requestParameters, false);
metadataProvider.getLocks().unlock();
}
for (Procedure procedure : usages.second) {
metadataProvider = MetadataProvider.create(appCtx, activeDataverse);
BADJobService.redeployJobSpec(procedure.getEntityId(), procedure.getBody(), metadataProvider, this, hcc,
requestParameters, false);
metadataProvider.getLocks().unlock();
}
}