protected void runTest()

in testing/axiom-testsuite/src/main/java/org/apache/axiom/ts/soap12/envelope/TestMTOMForwardStreaming.java [56:158]


    protected void runTest() throws Throwable {
        Blob blob1 = new TestBlob('A', Runtime.getRuntime().maxMemory());
        Blob blob2 = new TestBlob('B', Runtime.getRuntime().maxMemory());

        // Programmatically create the original message
        SOAPFactory factory = metaFactory.getSOAP12Factory();
        final SOAPEnvelope orgEnvelope = factory.createSOAPEnvelope();
        SOAPBody orgBody = factory.createSOAPBody(orgEnvelope);
        OMElement orgBodyElement =
                factory.createOMElement(
                        "test", factory.createOMNamespace("urn:test", "p"), orgBody);
        OMElement orgData1 = factory.createOMElement("data", null, orgBodyElement);
        orgData1.addChild(factory.createOMText(blob1, true));
        OMElement orgData2 = factory.createOMElement("data", null, orgBodyElement);
        orgData2.addChild(factory.createOMText(blob2, true));

        OMOutputFormat format = new OMOutputFormat();
        format.setDoOptimize(true);
        format.setSOAP11(false);
        String contentType = format.getContentType();

        PipedOutputStream pipe1Out = new PipedOutputStream();
        PipedInputStream pipe1In = new PipedInputStream(pipe1Out);

        // Create the producer thread (simulating the client sending the MTOM message)
        Thread producerThread =
                new Thread(
                        () -> {
                            try {
                                try {
                                    orgEnvelope.serialize(pipe1Out, format);
                                } finally {
                                    pipe1Out.close();
                                }
                            } catch (Exception ex) {
                                ex.printStackTrace();
                            }
                        });
        producerThread.start();

        PipedOutputStream pipe2Out = new PipedOutputStream();
        PipedInputStream pipe2In = new PipedInputStream(pipe2Out);

        // Create the forwarder thread (simulating the mediation engine that forwards the message)
        Thread forwarderThread =
                new Thread(
                        () -> {
                            try {
                                try {
                                    MultipartBody mb =
                                            MultipartBody.builder()
                                                    .setInputStream(pipe1In)
                                                    .setContentType(contentType)
                                                    .build();
                                    SOAPEnvelope envelope =
                                            OMXMLBuilderFactory.createSOAPModelBuilder(
                                                            metaFactory, mb)
                                                    .getSOAPEnvelope();
                                    // The code path executed by serializeAndConsume is
                                    // significantly different if
                                    // the element is built. Therefore we need two different
                                    // test executions.
                                    if (buildSOAPPart) {
                                        envelope.build();
                                    }
                                    // Usage of serializeAndConsume should enable streaming
                                    envelope.serializeAndConsume(pipe2Out, format);
                                } finally {
                                    pipe2Out.close();
                                }
                            } catch (Exception ex) {
                                ex.printStackTrace();
                            }
                        });
        forwarderThread.start();

        try {
            MultipartBody mb =
                    MultipartBody.builder()
                            .setInputStream(pipe2In)
                            .setContentType(contentType)
                            .build();
            SOAPEnvelope envelope =
                    OMXMLBuilderFactory.createSOAPModelBuilder(metaFactory, mb).getSOAPEnvelope();
            OMElement bodyElement = envelope.getBody().getFirstElement();
            Iterator<OMElement> it = bodyElement.getChildElements();
            OMElement data1 = it.next();
            OMElement data2 = it.next();

            IOTestUtils.compareStreams(
                    blob1.getInputStream(),
                    ((PartBlob) ((OMText) data1.getFirstOMChild()).getBlob())
                            .getPart()
                            .getInputStream(false));
            IOTestUtils.compareStreams(
                    blob2.getInputStream(),
                    ((PartBlob) ((OMText) data2.getFirstOMChild()).getBlob())
                            .getPart()
                            .getInputStream(false));
        } finally {
            pipe2In.close();
        }
    }