in src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java [614:1054]
private Vertex newVertex(TezOperator tezOp) throws IOException,
ClassNotFoundException, InterruptedException {
ProcessorDescriptor procDesc = ProcessorDescriptor.create(
tezOp.getProcessorName());
// Pass physical plans to vertex as user payload.
JobConf payloadConf = new JobConf(pigContextConf);
// We do this so that dag.getCredentials(), job.getCredentials(),
// job.getConfiguration().getCredentials() all reference the same Credentials object
// Unfortunately there is no setCredentials() on Job
payloadConf.setCredentials(dag.getCredentials());
// We won't actually use this job, but we need it to talk with the Load Store funcs
@SuppressWarnings("deprecation")
Job job = new Job(payloadConf);
payloadConf = (JobConf) job.getConfiguration();
//TODO: Investigate. Setting as map writes empty output.
//payloadConf.setBoolean(MRConfig.IS_MAP_PROCESSOR, tezOp.isUseMRMapSettings());
payloadConf.setBoolean(MRConfiguration.MAPPER_NEW_API, true);
payloadConf.setBoolean(MRConfiguration.REDUCER_NEW_API, true);
payloadConf.setClass(MRConfiguration.INPUTFORMAT_CLASS,
PigInputFormatTez.class, InputFormat.class);
setOutputFormat(job);
payloadConf.set("udf.import.list", serializedUDFImportList);
payloadConf.set("exectype", "TEZ");
payloadConf.setBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, pc.getExecType().isLocal());
payloadConf.set(PigImplConstants.PIG_LOG4J_PROPERTIES, ObjectSerializer.serialize(pc.getLog4jProperties()));
DateTimeWritable.setupAvailableZoneIds();
// Process stores
LinkedList<POStore> stores = processStores(tezOp, payloadConf, job);
// Process UserFuncs
processUserFuncs(tezOp, job);
Configuration inputPayLoad = null;
Configuration outputPayLoad = null;
if (!stores.isEmpty()) {
outputPayLoad = new Configuration(payloadConf);
outputPayLoad.set(JobControlCompiler.PIG_MAP_STORES,
ObjectSerializer.serialize(new ArrayList<POStore>()));
}
if (!(tezOp.getLoaderInfo().getLoads().isEmpty())) {
payloadConf.set(PigInputFormat.PIG_LOADS, ObjectSerializer.serialize(tezOp.getLoaderInfo().getLoads()));
payloadConf.set(PigInputFormat.PIG_INPUT_SIGNATURES, ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpSignatureLists()));
payloadConf.set(PigInputFormat.PIG_INPUT_LIMITS, ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpLimits()));
inputPayLoad = new Configuration(payloadConf);
}
if (tezOp.getSampleOperator() != null) {
payloadConf.set(PigProcessor.SAMPLE_VERTEX, tezOp.getSampleOperator().getOperatorKey().toString());
}
if (tezOp.getSortOperator() != null) {
// Required by Sample Aggregation job for estimating quantiles
payloadConf.set(PigProcessor.SORT_VERTEX, tezOp.getSortOperator().getOperatorKey().toString());
// PIG-4162: Order by/Skew Join in intermediate stage.
// Increasing order by parallelism may not be required as it is
// usually followed by limit other than store. But would benefit
// cases like skewed join followed by group by.
if (tezOp.getSortOperator().getEstimatedParallelism() != -1
&& tezOp.getSortOperator().isIntermediateReducer()) {
payloadConf.setLong(
InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
intermediateTaskInputSize);
}
}
// Set parent plan for all operators in the Tez plan.
new PhyPlanSetter(tezOp.plan).visit();
// Set the endOfAllInput flag on the physical plan if certain operators that
// use this property (such as STREAM) are present in the plan.
EndOfAllInputSetter.EndOfAllInputChecker checker =
new EndOfAllInputSetter.EndOfAllInputChecker(tezOp.plan);
checker.visit();
if (checker.isEndOfAllInputPresent()) {
payloadConf.set(JobControlCompiler.END_OF_INP_IN_MAP, "true");
}
// Configure the classes for incoming shuffles to this TezOp
// TODO: Refactor out resetting input keys, PIG-3957
List<PhysicalOperator> roots = tezOp.plan.getRoots();
if (roots.size() == 1 && roots.get(0) instanceof POPackage) {
POPackage pack = (POPackage) roots.get(0);
List<PhysicalOperator> succsList = tezOp.plan.getSuccessors(pack);
if (succsList != null) {
succsList = new ArrayList<PhysicalOperator>(succsList);
}
byte keyType = pack.getPkgr().getKeyType();
tezOp.plan.remove(pack);
payloadConf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
POShuffleTezLoad newPack = new POShuffleTezLoad(pack);
if (tezOp.isSkewedJoin()) {
newPack.setSkewedJoins(true);
}
tezOp.plan.add(newPack);
boolean isMergedInput = false;
// Set input keys for POShuffleTezLoad. This is used to identify
// the inputs that are attached to the POShuffleTezLoad in the
// backend.
Map<Integer, String> localRearrangeMap = new TreeMap<Integer, String>();
TezOperator from = null;
for (TezOperator pred : mPlan.getPredecessors(tezOp)) {
if (tezOp.getSampleOperator() != null && tezOp.getSampleOperator() == pred) {
// skip sample vertex input
} else {
String inputKey = pred.getOperatorKey().toString();
boolean isVertexGroup = false;
if (pred.isVertexGroup()) {
isVertexGroup = true;
pred = mPlan.getOperator(pred.getVertexGroupMembers().get(0));
}
LinkedList<POLocalRearrangeTez> lrs =
PlanHelper.getPhysicalOperators(pred.plan, POLocalRearrangeTez.class);
for (POLocalRearrangeTez lr : lrs) {
if (lr.isConnectedToPackage()
&& lr.containsOutputKey(tezOp.getOperatorKey().toString())) {
localRearrangeMap.put((int) lr.getIndex(), inputKey);
if (isVertexGroup) {
isMergedInput = true;
}
from = pred;
}
}
}
}
for (Map.Entry<Integer, String> entry : localRearrangeMap.entrySet()) {
newPack.addInputKey(entry.getValue());
}
if (succsList != null) {
for (PhysicalOperator succs : succsList) {
tezOp.plan.connect(newPack, succs);
}
}
//POShuffleTezLoad accesses the comparator setting
selectKeyComparator(keyType, payloadConf, tezOp, isMergedInput);
if (tezOp.isUseSecondaryKey()) {
TezEdgeDescriptor edge = tezOp.inEdges.get(from.getOperatorKey());
// Currently only PigSecondaryKeyGroupingComparator is used in POShuffleTezLoad.
// When PIG-4685: SecondaryKeyOptimizerTez does not optimize cogroup is fixed
// in future, PigSecondaryKeyComparator will have to be used and that will require this.
payloadConf.set("pig.secondarySortOrder", ObjectSerializer
.serialize(edge.getSecondarySortOrder()));
}
}
// set parent plan in all operators. currently the parent plan is really
// used only when POStream, POSplit are present in the plan
new PhyPlanSetter(tezOp.plan).visit();
// Serialize the execution plan
payloadConf.set(PigProcessor.PLAN,
ObjectSerializer.serialize(tezOp.plan));
udfContextSeparator.serializeUDFContext(payloadConf, tezOp);
if (!pc.inIllustrator) {
for (POStore store : stores) {
// unset inputs for POStore, otherwise, map/reduce plan will be unnecessarily deserialized
store.setInputs(null);
store.setParentPlan(null);
}
// We put them in the reduce because PigOutputCommitter checks the
// ID of the task to see if it's a map, and if not, calls the reduce
// committers.
payloadConf.set(JobControlCompiler.PIG_MAP_STORES,
ObjectSerializer.serialize(new ArrayList<POStore>()));
payloadConf.set(JobControlCompiler.PIG_REDUCE_STORES,
ObjectSerializer.serialize(stores));
}
if (tezOp.isNeedEstimateParallelism()) {
payloadConf.setBoolean(PigProcessor.ESTIMATE_PARALLELISM, true);
log.info("Estimate quantile for sample aggregation vertex " + tezOp.getOperatorKey().toString());
}
// set various parallelism into the job conf for later analysis, PIG-2779
payloadConf.setInt(PigImplConstants.REDUCER_DEFAULT_PARALLELISM, pc.defaultParallel);
payloadConf.setInt(PigImplConstants.REDUCER_REQUESTED_PARALLELISM, tezOp.getRequestedParallelism());
payloadConf.setInt(PigImplConstants.REDUCER_ESTIMATED_PARALLELISM, tezOp.getEstimatedParallelism());
TezScriptState ss = TezScriptState.get();
ss.addVertexSettingsToConf(dag.getName(), tezOp, payloadConf);
// Take our assembled configuration and create a vertex
UserPayload userPayload = TezUtils.createUserPayloadFromConf(payloadConf);
TezDAGScriptInfo dagScriptInfo = TezScriptState.get().getDAGScriptInfo(dag.getName());
String alias = dagScriptInfo.getAlias(tezOp);
String aliasLocation = dagScriptInfo.getAliasLocation(tezOp);
String features = dagScriptInfo.getPigFeatures(tezOp);
String vertexInfo = aliasLocation + " (" + features + ")" ;
procDesc.setUserPayload(userPayload).setHistoryText(TezUtils.convertToHistoryText(vertexInfo, payloadConf));
String vmPluginName = null;
Configuration vmPluginConf = null;
boolean containScatterGather = false;
boolean containCustomPartitioner = false;
for (TezEdgeDescriptor edge : tezOp.inEdges.values()) {
if (edge.dataMovementType == DataMovementType.SCATTER_GATHER) {
containScatterGather = true;
}
if (edge.partitionerClass != null) {
containCustomPartitioner = true;
}
}
if(containScatterGather) {
vmPluginName = ShuffleVertexManager.class.getName();
vmPluginConf = new Configuration(shuffleVertexManagerBaseConf);
}
// Set the right VertexManagerPlugin
if (tezOp.getEstimatedParallelism() != -1) {
boolean autoParallelism = false;
if (tezOp.isGlobalSort()||tezOp.isSkewedJoin()) {
if (tezOp.getVertexParallelism()==-1 && (
tezOp.isGlobalSort() &&getPlan().getPredecessors(tezOp).size()==1||
tezOp.isSkewedJoin() &&getPlan().getPredecessors(tezOp).size()==2)) {
// Set VertexManagerPlugin to PartitionerDefinedVertexManager, which is able
// to decrease/increase parallelism of sorting vertex dynamically
// based on the numQuantiles calculated by sample aggregation vertex
vmPluginName = PartitionerDefinedVertexManager.class.getName();
autoParallelism = true;
log.info("Set VertexManagerPlugin to PartitionerDefinedParallelismVertexManager for vertex " + tezOp.getOperatorKey().toString());
}
} else {
if (containScatterGather && !containCustomPartitioner) {
// For Intermediate reduce, set the bytes per reducer to be block size.
long bytesPerReducer = intermediateTaskInputSize;
// If there are store statements, use BYTES_PER_REDUCER_PARAM configured by user.
// If not as default use 384MB for group bys and 256 MB for joins. Not using
// default 1G as that value was suited for mapreduce logic where numReducers=(map input size/bytesPerReducer).
// In Tez, numReducers=(map output size/bytesPerReducer) we need lower values to avoid skews in reduce
// as map input sizes are mostly always high compared to map output.
if (stores.size() > 0) {
if (pigContextConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM) != null) {
bytesPerReducer = pigContextConf.getLong(
InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER);
} else if (tezOp.isGroupBy()) {
bytesPerReducer = SHUFFLE_BYTES_PER_REDUCER_GROUPBY_DEFAULT;
} else {
bytesPerReducer = SHUFFLE_BYTES_PER_REDUCER_DEFAULT;
}
}
// Use auto-parallelism feature of ShuffleVertexManager to dynamically
// reduce the parallelism of the vertex. Use PigGraceShuffleVertexManager
// instead of ShuffleVertexManager if pig.tez.grace.parallelism is turned on
if (payloadConf.getBoolean(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, true)
&& !TezOperPlan.getGrandParentsForGraceParallelism(getPlan(), tezOp).isEmpty()
&& tezOp.getCrossKeys() == null) {
vmPluginName = PigGraceShuffleVertexManager.class.getName();
tezOp.setUseGraceParallelism(true);
vmPluginConf.set("pig.tez.plan", getSerializedTezPlan());
vmPluginConf.set(PigImplConstants.PIG_CONTEXT, serializedPigContext);
vmPluginConf.setLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, bytesPerReducer);
}
vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, bytesPerReducer);
autoParallelism = true;
log.info("Set auto parallelism for vertex " + tezOp.getOperatorKey().toString());
}
}
if (globalConf.getBoolean(PigConfiguration.PIG_TEZ_AUTO_PARALLELISM_DISABLE_DAG_RECOVERY, false) && autoParallelism) {
disableDAGRecovery = true;
}
}
if (tezOp.isLimit() && (vmPluginName == null || vmPluginName.equals(PigGraceShuffleVertexManager.class.getName())||
vmPluginName.equals(ShuffleVertexManager.class.getName()))) {
if (tezOp.inEdges.values().iterator().next().inputClassName.equals(UnorderedKVInput.class.getName())) {
// Setting SRC_FRACTION to 0.00001 so that even if there are 100K source tasks,
// limit job starts when 1 source task finishes.
// If limit is part of a group by or join because their parallelism is 1,
// we should leave the configuration with the defaults.
vmPluginConf = (vmPluginConf == null) ? new Configuration(pigContextConf) : vmPluginConf;
vmPluginConf.set(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, "0.00001");
vmPluginConf.set(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, "0.00001");
log.info("Set " + ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION + " to 0.00001 for limit vertex " + tezOp.getOperatorKey().toString());
}
}
int parallel = tezOp.getVertexParallelism();
if (tezOp.isUseGraceParallelism()) {
parallel = -1;
}
Resource resource = tezOp.isUseMRMapSettings() ? mapTaskResource : reduceTaskResource;
Vertex vertex = Vertex.create(tezOp.getOperatorKey().toString(), procDesc, parallel, resource);
if (tezOp.isUseMRMapSettings()) {
vertex.setTaskLaunchCmdOpts(mapTaskLaunchCmdOpts);
vertex.setTaskEnvironment(mapTaskEnv);
} else {
vertex.setTaskLaunchCmdOpts(reduceTaskLaunchCmdOpts);
vertex.setTaskEnvironment(reduceTaskEnv);
}
MRToTezHelper.setVertexConfig(vertex, tezOp.isUseMRMapSettings(), globalConf);
log.info("For vertex - " + tezOp.getOperatorKey().toString()
+ ": parallelism=" + tezOp.getVertexParallelism()
+ ", memory=" + vertex.getTaskResource().getMemory()
+ ", java opts=" + vertex.getTaskLaunchCmdOpts()
);
log.info("Processing aliases: " + alias);
log.info("Detailed locations: " + aliasLocation);
log.info("Pig features in the vertex: " + features);
// Right now there can only be one of each of these. Will need to be
// more generic when there can be more.
for (POLoad ld : tezOp.getLoaderInfo().getLoads()) {
// TODO: These should get the globalConf, or a merged version that
// keeps settings like pig.maxCombinedSplitSize
Builder userPayLoadBuilder = MRRuntimeProtos.MRInputUserPayloadProto.newBuilder();
InputSplitInfo inputSplitInfo = tezOp.getLoaderInfo().getInputSplitInfo();
Map<String, LocalResource> additionalLocalResources = null;
int spillThreshold = payloadConf
.getInt(PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD,
PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD_DEFAULT);
// Currently inputSplitInfo is always InputSplitInfoMem at this point
if (inputSplitInfo instanceof InputSplitInfoMem) {
MRSplitsProto.Builder splitsBuilder = MRSplitsProto.newBuilder();
Pair<Long, Boolean> serializationInfo = TezInputHelper.createSplitsProto(inputSplitInfo, pigContextConf, splitsBuilder,
spillThreshold);
MRSplitsProto splitsProto = splitsBuilder.build();
if(!serializationInfo.second) {
//write to disk
inputPayLoad.setBoolean(
org.apache.tez.mapreduce.hadoop.MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS,
false);
// Write splits to disk
Path inputSplitsDir = FileLocalizer.getTemporaryPath(pc);
log.info("Writing input splits to " + inputSplitsDir
+ " for vertex " + vertex.getName()
+ " as the partially serialized size in memory is "
+ serializationInfo.first + ". Configured "
+ PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD
+ " is " + spillThreshold);
inputSplitInfo = MRToTezHelper.writeInputSplitInfoToDisk(
(InputSplitInfoMem)inputSplitInfo, inputSplitsDir, payloadConf, fs, splitsProto);
additionalLocalResources = new HashMap<String, LocalResource>();
MRToTezHelper.updateLocalResourcesForInputSplits(
fs, inputSplitInfo,
additionalLocalResources);
inputSplitInDiskVertices.add(vertex.getName());
} else {
// Send splits via RPC to AM
userPayLoadBuilder.setSplits(splitsProto);
}
//Free up memory
tezOp.getLoaderInfo().setInputSplitInfo(null);
}
udfContextSeparator.serializeUDFContext(inputPayLoad, tezOp, UDFType.LOADFUNC);
userPayLoadBuilder.setConfigurationBytes(TezUtils.createByteStringFromConf(inputPayLoad));
vertex.setLocationHint(VertexLocationHint.create(inputSplitInfo.getTaskLocationHints()));
vertex.addDataSource(ld.getOperatorKey().toString(),
DataSourceDescriptor.create(InputDescriptor.create(MRInput.class.getName())
.setUserPayload(UserPayload.create(userPayLoadBuilder.build().toByteString().asReadOnlyByteBuffer())),
InputInitializerDescriptor.create(MRInputSplitDistributor.class.getName()),
inputSplitInfo.getNumTasks(),
dag.getCredentials(),
null,
additionalLocalResources));
}
// Union within a split can have multiple stores writing to same output
Set<String> uniqueStoreOutputs = new HashSet<String>();
for (POStore store : stores) {
ArrayList<POStore> singleStore = new ArrayList<POStore>();
singleStore.add(store);
Configuration outPayLoad = new Configuration(outputPayLoad);
udfContextSeparator.serializeUDFContext(outPayLoad, tezOp, store);
outPayLoad.set(JobControlCompiler.PIG_REDUCE_STORES,
ObjectSerializer.serialize(singleStore));
OutputDescriptor storeOutDescriptor = OutputDescriptor.create(
MROutput.class.getName()).setUserPayload(TezUtils
.createUserPayloadFromConf(outPayLoad));
if (tezOp.getVertexGroupStores() != null) {
OperatorKey vertexGroupKey = tezOp.getVertexGroupStores().get(store.getOperatorKey());
if (vertexGroupKey != null) {
getPlan().getOperator(vertexGroupKey).getVertexGroupInfo()
.setStoreOutputDescriptor(storeOutDescriptor);
continue;
}
}
String outputKey = ((POStoreTez) store).getOutputKey();
if (!uniqueStoreOutputs.contains(outputKey)) {
vertex.addDataSink(outputKey.toString(),
DataSinkDescriptor.create(storeOutDescriptor,
OutputCommitterDescriptor.create(MROutputCommitter.class.getName()),
dag.getCredentials()));
uniqueStoreOutputs.add(outputKey);
}
}
// LoadFunc and StoreFunc add delegation tokens to Job Credentials in
// setLocation and setStoreLocation respectively. For eg: HBaseStorage
// InputFormat add delegation token in getSplits and OutputFormat in
// checkOutputSpecs. For eg: FileInputFormat and FileOutputFormat
if (stores.size() > 0) {
new PigOutputFormat().checkOutputSpecs(job);
}
// else if(tezOp.isLimitAfterSort())
// TODO: PIG-4049 If standalone Limit we need a new VertexManager or new input
// instead of ShuffledMergedInput. For limit part of the sort (order by parallel 1) itself
// need to enhance PartitionerDefinedVertexManager
if (vmPluginName != null) {
VertexManagerPluginDescriptor vmPluginDescriptor = VertexManagerPluginDescriptor.create(vmPluginName);
if (vmPluginConf != null) {
vmPluginDescriptor.setUserPayload(TezUtils.createUserPayloadFromConf(vmPluginConf));
}
vertex.setVertexManagerPlugin(vmPluginDescriptor);
}
// Reset udfcontext jobconf. It is not supposed to be set in the front end
UDFContext.getUDFContext().addJobConf(null);
return vertex;
}