in hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/service/PDService.java [156:342]
public void init() throws PDException {
log.info("PDService init………… {}", pdConfig);
configService = new ConfigService(pdConfig);
RaftEngine.getInstance().addStateListener(this);
RaftEngine.getInstance().addStateListener(configService);
RaftEngine.getInstance().init(pdConfig.getRaft());
//pdConfig = configService.loadConfig(); onLeaderChanged
storeNodeService = new StoreNodeService(pdConfig);
partitionService = new PartitionService(pdConfig, storeNodeService);
taskService = new TaskScheduleService(pdConfig, storeNodeService, partitionService);
idService = new IdService(pdConfig);
logService = new LogService(pdConfig);
storeMonitorDataService = new StoreMonitorDataService(pdConfig);
//if (licenseVerifierService == null) {
// licenseVerifierService = new LicenseVerifierService(pdConfig);
//}
RaftEngine.getInstance().addStateListener(partitionService);
pdConfig.setIdService(idService);
// Receive a heartbeat message
PDPulseSubject.listenPartitionHeartbeat(new PulseListener<PartitionHeartbeatRequest>() {
@Override
public void onNext(PartitionHeartbeatRequest request) throws Exception {
partitionService.partitionHeartbeat(request.getStates());
}
@Override
public void onError(Throwable throwable) {
log.error("Received an error notice from pd-client", throwable);
}
@Override
public void onCompleted() {
log.info("Received an completed notice from pd-client");
}
});
/**
// Listen for partition commands and forward them to Store
*/
partitionService.addInstructionListener(new PartitionInstructionListener() {
private PartitionHeartbeatResponse.Builder getBuilder(Metapb.Partition partition) throws
PDException {
return PartitionHeartbeatResponse.newBuilder().setPartition(partition)
.setId(idService.getId(TASK_ID_KEY, 1));
}
@Override
public void changeShard(Metapb.Partition partition, ChangeShard changeShard) throws
PDException {
PDPulseSubject.notifyClient(getBuilder(partition).setChangeShard(changeShard));
}
@Override
public void transferLeader(Metapb.Partition partition,
TransferLeader transferLeader) throws
PDException {
PDPulseSubject.notifyClient(
getBuilder(partition).setTransferLeader(transferLeader));
}
@Override
public void splitPartition(Metapb.Partition partition,
SplitPartition splitPartition) throws
PDException {
PDPulseSubject.notifyClient(
getBuilder(partition).setSplitPartition(splitPartition));
}
@Override
public void dbCompaction(Metapb.Partition partition, DbCompaction dbCompaction) throws
PDException {
PDPulseSubject.notifyClient(getBuilder(partition).setDbCompaction(dbCompaction));
}
@Override
public void movePartition(Metapb.Partition partition,
MovePartition movePartition) throws PDException {
PDPulseSubject.notifyClient(getBuilder(partition).setMovePartition(movePartition));
}
@Override
public void cleanPartition(Metapb.Partition partition,
CleanPartition cleanPartition) throws PDException {
PDPulseSubject.notifyClient(
getBuilder(partition).setCleanPartition(cleanPartition));
}
@Override
public void changePartitionKeyRange(Metapb.Partition partition,
PartitionKeyRange partitionKeyRange)
throws PDException {
PDPulseSubject.notifyClient(getBuilder(partition).setKeyRange(partitionKeyRange));
}
});
/**
// Listen for partition status change messages and forward them to Client
*/
partitionService.addStatusListener(new PartitionStatusListener() {
@Override
public void onPartitionChanged(Metapb.Partition old, Metapb.Partition partition) {
PDWatchSubject.notifyPartitionChange(PDWatchSubject.ChangeType.ALTER,
partition.getGraphName(), partition.getId());
}
@Override
public void onPartitionRemoved(Metapb.Partition partition) {
PDWatchSubject.notifyPartitionChange(PDWatchSubject.ChangeType.DEL,
partition.getGraphName(),
partition.getId());
}
});
storeNodeService.addShardGroupStatusListener(new ShardGroupStatusListener() {
@Override
public void onShardListChanged(Metapb.ShardGroup shardGroup,
Metapb.ShardGroup newShardGroup) {
// invoked before change, saved to db and update cache.
if (newShardGroup == null) {
PDWatchSubject.notifyShardGroupChange(PDWatchSubject.ChangeType.DEL,
shardGroup.getId(),
shardGroup);
} else {
PDWatchSubject.notifyShardGroupChange(PDWatchSubject.ChangeType.ALTER,
shardGroup.getId(), newShardGroup);
}
}
@Override
public void onShardListOp(Metapb.ShardGroup shardGroup) {
PDWatchSubject.notifyShardGroupChange(PDWatchSubject.ChangeType.USER_DEFINED,
shardGroup.getId(), shardGroup);
}
});
/**
// Listen for store status change messages and forward them to Client
*/
storeNodeService.addStatusListener(new StoreStatusListener() {
@Override
public void onStoreStatusChanged(Metapb.Store store,
Metapb.StoreState old,
Metapb.StoreState status) {
NodeEventType type = NodeEventType.NODE_EVENT_TYPE_UNKNOWN;
if (status == Metapb.StoreState.Up) {
type = NodeEventType.NODE_EVENT_TYPE_NODE_ONLINE;
} else if (status == Metapb.StoreState.Offline) {
type = NodeEventType.NODE_EVENT_TYPE_NODE_OFFLINE;
}
PDWatchSubject.notifyNodeChange(type, "", store.getId());
}
@Override
public void onGraphChange(Metapb.Graph graph,
Metapb.GraphState stateOld,
Metapb.GraphState stateNew) {
WatchGraphResponse wgr = WatchGraphResponse.newBuilder()
.setGraph(graph)
.build();
WatchResponse.Builder wr = WatchResponse.newBuilder()
.setGraphResponse(wgr);
PDWatchSubject.notifyChange(WatchType.WATCH_TYPE_GRAPH_CHANGE,
wr);
}
@Override
public void onStoreRaftChanged(Metapb.Store store) {
PDWatchSubject.notifyNodeChange(NodeEventType.NODE_EVENT_TYPE_NODE_RAFT_CHANGE, "",
store.getId());
}
});
storeNodeService.init(partitionService);
partitionService.init();
taskService.init();
// log.info("init .......");
// licenseVerifierService.init();
// UpgradeService upgradeService = new UpgradeService(pdConfig);
// upgradeService.upgrade();
}