public static LoadAwareCustomStreamGrouping mkGrouper()

in storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java [58:121]


    public static LoadAwareCustomStreamGrouping mkGrouper(WorkerTopologyContext context, String componentId, String streamId,
                                                          Fields outFields,
                                                          Grouping thriftGrouping,
                                                          List<Integer> unsortedTargetTasks,
                                                          Map<String, Object> topoConf) {
        List<Integer> targetTasks = Ordering.natural().sortedCopy(unsortedTargetTasks);
        final boolean isNotLoadAware = (null != topoConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING) && (boolean) topoConf
            .get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING));
        CustomStreamGrouping result = null;
        switch (Thrift.groupingType(thriftGrouping)) {
            case FIELDS:
                if (Thrift.isGlobalGrouping(thriftGrouping)) {
                    result = new GlobalGrouper();
                } else {
                    result = new FieldsGrouper(outFields, thriftGrouping);
                }
                break;
            case SHUFFLE:
                if (isNotLoadAware) {
                    result = new ShuffleGrouping();
                } else {
                    result = new LoadAwareShuffleGrouping();
                }
                break;
            case ALL:
                result = new AllGrouper();
                break;
            case LOCAL_OR_SHUFFLE:
                // Prefer local tasks as target tasks if possible
                Set<Integer> sameTasks = Sets.intersection(Sets.newHashSet(targetTasks), Sets.newHashSet(context.getThisWorkerTasks()));
                targetTasks = (sameTasks.isEmpty()) ? targetTasks : new ArrayList<>(sameTasks);
                if (isNotLoadAware) {
                    result = new ShuffleGrouping();
                } else {
                    result = new LoadAwareShuffleGrouping();
                }
                break;
            case NONE:
                result = new NoneGrouper();
                break;
            case CUSTOM_OBJECT:
                result = (CustomStreamGrouping) Thrift.instantiateJavaObject(thriftGrouping.get_custom_object());
                break;
            case CUSTOM_SERIALIZED:
                result = Utils.javaDeserialize(thriftGrouping.get_custom_serialized(), CustomStreamGrouping.class);
                break;
            case DIRECT:
                result = DIRECT;
                break;
            default:
                result = null;
                break;
        }

        if (null != result) {
            result.prepare(context, new GlobalStreamId(componentId, streamId), targetTasks);
        }

        if (result instanceof LoadAwareCustomStreamGrouping) {
            return (LoadAwareCustomStreamGrouping) result;
        } else {
            return new BasicLoadAwareCustomStreamGrouping(result);
        }
    }