in python/src/pywy/platforms/jvm/serializable/plan_writter.py [0:0]
def send_message_to_wayang(self):
connections = {}
sources = []
sinks = []
operators = []
for operator in self.originals:
if not operator.is_unary():
raise PywyException(
"the not unary operator are not supported".format(
type(operator),
operator
)
)
if operator.is_operator():
connections[operator] = self.add_proto_unary_operator(operator)
operators.append(connections[operator])
elif operator.is_source():
connections[operator] = self.add_proto_source_operator(operator)
sources.append(connections[operator])
elif operator.is_sink():
connections[operator] = self.add_proto_sink_operator(operator)
sinks.append(connections[operator])
else:
raise PywyException(
"the type {} for the operator {} is not supported {}".format(
type(operator),
operator,
WayangJVMTextFileSink.mro()
)
)
for operator in self.originals:
current = connections[operator]
for ele in operator.previous:
current.predecessors.append(connections.get(ele).id)
for ele in operator.nexts:
current.successors.append(connections.get(ele).id)
plan_configuration = pwb.WayangPlanProto()
plan = pwb.PlanProto()
plan.sources.extend(sources)
plan.operators.extend(operators)
plan.sinks.extend(sinks)
plan.input = pwb.PlanProto.string
plan.output = pwb.PlanProto.string
ctx = pwb.ContextProto()
ctx.platforms.extend([pwb.ContextProto.PlatformProto.java])
plan_configuration.plan.CopyFrom(plan)
plan_configuration.context.CopyFrom(ctx)
print("plan!")
print(plan_configuration)
msg_bytes = plan_configuration.SerializeToString()
msg_64 = base64.b64encode(msg_bytes)
logging.debug(msg_bytes)
# response = requests.get("http://localhost:8080/plan/create/fromfile")
data = {
'message': msg_64
}
response = requests.post("http://localhost:8080/plan/create", data)
logging.debug(response)