packages/dubbo-web/src/grpc-web-transport.ts (245 lines of code) (raw):
// Copyright 2021-2023 Buf Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import type {
AnyMessage,
BinaryReadOptions,
BinaryWriteOptions,
JsonReadOptions,
JsonWriteOptions,
MethodInfo,
PartialMessage,
ServiceType,
} from "@bufbuild/protobuf";
import { Message, MethodKind } from "@bufbuild/protobuf";
import type {
Interceptor,
StreamResponse,
Transport,
UnaryRequest,
UnaryResponse,
} from "apache-dubbo";
import {
createClientMethodSerializers,
createEnvelopeReadableStream,
createMethodUrl,
encodeEnvelope,
runStreamingCall,
runUnaryCall,
} from "apache-dubbo/protocol";
import {
requestHeader,
trailerFlag,
trailerParse,
validateResponse,
validateTrailer,
} from "apache-dubbo/protocol-grpc-web";
import { assertFetchApi } from "./assert-fetch-api.js";
/**
* Options used to configure the gRPC-web transport.
*
* See createGrpcWebTransport().
*/
export interface GrpcWebTransportOptions {
/**
* Base URI for all HTTP requests.
*
* Requests will be made to <baseUrl>/<package>.<service>/method
*
* Example: `baseUrl: "https://example.com/my-api"`
*
* This will make a `POST /my-api/my_package.MyService/Foo` to
* `example.com` via HTTPS.
*
* If your API is served from the same domain as your site, use
* `baseUrl: window.location.origin` or simply "/".
*/
baseUrl: string;
/**
* By default, clients use the binary format for gRPC-web, because
* not all gRPC-web implementations support JSON.
*/
useBinaryFormat?: boolean;
/**
* Interceptors that should be applied to all calls running through
* this transport. See the Interceptor type for details.
*/
interceptors?: Interceptor[];
/**
* Controls what the fetch client will do with credentials, such as
* Cookies. The default value is "same-origin". For reference, see
* https://fetch.spec.whatwg.org/#concept-request-credentials-mode
*/
credentials?: RequestCredentials;
/**
* Options for the JSON format.
* By default, unknown fields are ignored.
*/
jsonOptions?: Partial<JsonReadOptions & JsonWriteOptions>;
/**
* Options for the binary wire format.
*/
binaryOptions?: Partial<BinaryReadOptions & BinaryWriteOptions>;
/**
* Optional override of the fetch implementation used by the transport.
*/
fetch?: typeof globalThis.fetch;
}
/**
* Create a Transport for the gRPC-web protocol. The protocol encodes
* trailers in the response body and makes unary and server-streaming
* methods available to web browsers. It uses the fetch API to make
* HTTP requests.
*
* Note that this transport does not implement the grpc-web-text format,
* which applies base64 encoding to the request and response bodies to
* support reading streaming responses from an XMLHttpRequest.
*/
export function createGrpcWebTransport(
options: GrpcWebTransportOptions
): Transport {
assertFetchApi();
const useBinaryFormat = options.useBinaryFormat ?? true;
const fetch = options.fetch ?? globalThis.fetch;
return {
async unary<
I extends Message<I> = AnyMessage,
O extends Message<O> = AnyMessage
>(
service: ServiceType,
method: MethodInfo<I, O>,
signal: AbortSignal | undefined,
timeoutMs: number | undefined,
header: Headers,
message: PartialMessage<I>
): Promise<UnaryResponse<I, O>> {
const { normalize, serialize, parse } = createClientMethodSerializers(
method,
useBinaryFormat,
options.jsonOptions,
options.binaryOptions
);
return await runUnaryCall<I, O>({
interceptors: options.interceptors,
signal,
timeoutMs,
req: {
stream: false,
service,
method,
url: createMethodUrl(options.baseUrl, service, method),
init: {
method: "POST",
credentials: options.credentials ?? "same-origin",
redirect: "error",
mode: "cors",
},
header: requestHeader(useBinaryFormat, timeoutMs, header),
message: normalize(message),
},
next: async (req: UnaryRequest<I, O>): Promise<UnaryResponse<I, O>> => {
const response = await fetch(req.url, {
...req.init,
headers: req.header,
signal: req.signal,
body: encodeEnvelope(0, serialize(req.message)),
});
validateResponse(response.status, response.headers);
if (!response.body) {
throw "missing response body";
}
const reader = createEnvelopeReadableStream(
response.body
).getReader();
let trailer: Headers | undefined;
let message: O | undefined;
for (;;) {
const r = await reader.read();
if (r.done) {
break;
}
const { flags, data } = r.value;
if (flags === trailerFlag) {
if (trailer !== undefined) {
throw "extra trailer";
}
// Unary responses require exactly one response message, but in
// case of an error, it is perfectly valid to have a response body
// that only contains error trailers.
trailer = trailerParse(data);
continue;
}
if (message !== undefined) {
throw "extra message";
}
message = parse(data);
}
if (trailer === undefined) {
throw "missing trailer";
}
validateTrailer(trailer);
if (message === undefined) {
throw "missing message";
}
return <UnaryResponse<I, O>>{
stream: false,
header: response.headers,
message,
trailer,
};
},
});
},
async stream<
I extends Message<I> = AnyMessage,
O extends Message<O> = AnyMessage
>(
service: ServiceType,
method: MethodInfo<I, O>,
signal: AbortSignal | undefined,
timeoutMs: number | undefined,
header: HeadersInit | undefined,
input: AsyncIterable<I>
): Promise<StreamResponse<I, O>> {
const { serialize, parse } = createClientMethodSerializers(
method,
useBinaryFormat,
options.jsonOptions,
options.binaryOptions
);
async function* parseResponseBody(
body: ReadableStream<Uint8Array>,
foundStatus: boolean,
trailerTarget: Headers
) {
const reader = createEnvelopeReadableStream(body).getReader();
if (foundStatus) {
// A grpc-status: 0 response header was present. This is a "trailers-only"
// response (a response without a body and no trailers).
//
// The spec seems to disallow a trailers-only response for status 0 - we are
// lenient and only verify that the body is empty.
//
// > [...] Trailers-Only is permitted for calls that produce an immediate error.
// See https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
if (!(await reader.read()).done) {
throw "extra data for trailers-only";
}
return;
}
let trailerReceived = false;
for (;;) {
const result = await reader.read();
if (result.done) {
break;
}
const { flags, data } = result.value;
if ((flags & trailerFlag) === trailerFlag) {
if (trailerReceived) {
throw "extra trailer";
}
trailerReceived = true;
const trailer = trailerParse(data);
validateTrailer(trailer);
trailer.forEach((value, key) => trailerTarget.set(key, value));
continue;
}
if (trailerReceived) {
throw "extra message";
}
yield parse(data);
continue;
}
if (!trailerReceived) {
throw "missing trailer";
}
}
async function createRequestBody(
input: AsyncIterable<I>
): Promise<Uint8Array> {
if (method.kind != MethodKind.ServerStreaming) {
throw "The fetch API does not support streaming request bodies";
}
const r = await input[Symbol.asyncIterator]().next();
if (r.done == true) {
throw "missing request message";
}
return encodeEnvelope(0, serialize(r.value));
}
return runStreamingCall<I, O>({
interceptors: options.interceptors,
signal,
timeoutMs,
req: {
stream: true,
service,
method,
url: createMethodUrl(options.baseUrl, service, method),
init: {
method: "POST",
credentials: options.credentials ?? "same-origin",
redirect: "error",
mode: "cors",
},
header: requestHeader(useBinaryFormat, timeoutMs, header),
message: input,
},
next: async (req) => {
const fRes = await fetch(req.url, {
...req.init,
headers: req.header,
signal: req.signal,
body: await createRequestBody(req.message),
});
const { foundStatus } = validateResponse(fRes.status, fRes.headers);
if (!fRes.body) {
throw "missing response body";
}
const trailer = new Headers();
const res: StreamResponse<I, O> = {
...req,
header: fRes.headers,
trailer,
message: parseResponseBody(fRes.body, foundStatus, trailer),
};
return res;
},
});
},
};
}