plugins/experimental/wasm/lib/include/proxy-wasm/context.h (307 lines of code) (raw):
// Copyright 2016-2019 Envoy Project Authors
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <atomic>
#include <chrono>
#include <ctime>
#include <functional>
#include <iostream>
#include <map>
#include <memory>
#include <string>
#include <string_view>
#include <vector>
#include "include/proxy-wasm/context_interface.h"
namespace proxy_wasm {
#include "proxy_wasm_common.h"
#include "proxy_wasm_enums.h"
class PluginHandleBase;
class WasmBase;
class WasmVm;
/**
* PluginBase is container to hold plugin information which is shared with all Context(s) created
* for a given plugin. Embedders may extend this class with additional host-specific plugin
* information as required.
* @param name is the name of the plugin.
* @param root_id is an identifier for the in VM handlers for this plugin.
* @param vm_id is a string used to differentiate VMs with the same code and VM configuration.
* @param plugin_configuration is configuration for this plugin.
* @param fail_open if true the plugin will pass traffic as opposed to close all streams.
* @param key is used to uniquely identify this plugin instance.
*/
struct PluginBase {
PluginBase(std::string_view name, std::string_view root_id, std::string_view vm_id,
std::string_view engine, std::string_view plugin_configuration, bool fail_open,
std::string_view key)
: name_(std::string(name)), root_id_(std::string(root_id)), vm_id_(std::string(vm_id)),
engine_(std::string(engine)), plugin_configuration_(plugin_configuration),
fail_open_(fail_open), key_(makePluginKey(root_id, plugin_configuration, key)),
log_prefix_(makeLogPrefix()) {}
const std::string name_;
const std::string root_id_;
const std::string vm_id_;
const std::string engine_;
const std::string plugin_configuration_;
const bool fail_open_;
const std::string &key() const { return key_; }
const std::string &log_prefix() const { return log_prefix_; }
private:
std::string makeLogPrefix() const;
static std::string makePluginKey(std::string_view root_id, std::string_view plugin_configuration,
std::string_view key);
const std::string key_;
const std::string log_prefix_;
};
struct BufferBase : public BufferInterface {
BufferBase() = default;
~BufferBase() override = default;
// BufferInterface
size_t size() const override {
if (owned_data_) {
return owned_data_size_;
}
return data_.size();
}
WasmResult copyTo(WasmBase *wasm, size_t start, size_t length, uint64_t ptr_ptr,
uint64_t size_ptr) const override;
WasmResult copyFrom(size_t /* start */, size_t /* length */,
std::string_view /* data */) override {
// Setting a string buffer not supported (no use case).
return WasmResult::BadArgument;
}
virtual void clear() {
data_ = "";
owned_data_ = nullptr;
}
BufferBase *set(std::string_view data) {
clear();
data_ = data;
return this;
}
BufferBase *set(std::unique_ptr<char[]> owned_data, uint32_t owned_data_size) {
clear();
owned_data_ = std::move(owned_data);
owned_data_size_ = owned_data_size;
return this;
}
protected:
std::string_view data_;
std::unique_ptr<char[]> owned_data_;
uint32_t owned_data_size_;
};
/**
* ContextBase is the interface between the VM host and the VM. It has several uses:
*
* 1) To provide host-specific implementations of ABI calls out of the VM. For example, a proxy
* which wants to provide the ability to make an HTTP call must implement the
* ContextBase::httpCall() method.
*
* 2) To call into the VM. For example, when the above mentioned httpCall() completes, the host must
* call ContextBase::onHttpCallResponse(). Similarly, when a new HTTP request arrives and the
* headers are available, the host must create a new ContextBase object to manage the new stream and
* call onRequestHeaders() on that object which will cause a corresponding Context to be allocated
* in the VM which will receive the proxy_on_context_create and proxy_on_request_headers calls.
*
* 3) For testing and instrumentation the methods of ContextBase can be replaces or augmented.
*/
class ContextBase : public RootInterface,
public HttpInterface,
public NetworkInterface,
public StreamInterface,
public HeaderInterface,
public HttpCallInterface,
public GrpcCallInterface,
public GrpcStreamInterface,
public MetricsInterface,
public SharedDataInterface,
public SharedQueueInterface,
public GeneralInterface {
public:
ContextBase(); // Testing.
ContextBase(WasmBase *wasm); // Vm Context.
ContextBase(WasmBase *wasm, const std::shared_ptr<PluginBase> &plugin); // Root Context.
ContextBase(WasmBase *wasm, uint32_t parent_context_id,
const std::shared_ptr<PluginHandleBase> &plugin_handle); // Stream context.
virtual ~ContextBase();
WasmBase *wasm() const { return wasm_; }
uint32_t id() const { return id_; }
// The VM Context used for calling "malloc" has an id_ == 0.
bool isVmContext() const { return id_ == 0; }
// Root Contexts have the VM Context as a parent.
bool isRootContext() const { return parent_context_id_ == 0; }
ContextBase *parent_context() const { return parent_context_; }
ContextBase *root_context() const {
const ContextBase *previous = this;
ContextBase *parent = parent_context_;
while (parent != previous) {
previous = parent;
parent = parent->parent_context_;
}
return parent;
}
std::string_view root_id() const { return isRootContext() ? root_id_ : plugin_->root_id_; }
std::string_view log_prefix() const {
return isRootContext() ? root_log_prefix_ : plugin_->log_prefix();
}
WasmVm *wasmVm() const;
// Called before deleting the context.
virtual void destroy();
/**
* Calls into the VM.
* These are implemented by the proxy-independent host code. They are virtual to support some
* types of testing.
*/
// Context
void onCreate() override;
bool onDone() override;
void onLog() override;
void onDelete() override;
void onForeignFunction(uint32_t foreign_function_id, uint32_t data_size) override;
// Root
bool onStart(std::shared_ptr<PluginBase> plugin) override;
bool onConfigure(std::shared_ptr<PluginBase> plugin) override;
void onTick(TimerToken token) override;
void onQueueReady(SharedQueueDequeueToken token) override;
// HTTP
FilterHeadersStatus onRequestHeaders(uint32_t headers, bool end_of_stream) override;
FilterDataStatus onRequestBody(uint32_t body_length, bool end_of_stream) override;
FilterTrailersStatus onRequestTrailers(uint32_t trailers) override;
FilterMetadataStatus onRequestMetadata(uint32_t elements) override;
FilterHeadersStatus onResponseHeaders(uint32_t headers, bool end_of_stream) override;
FilterDataStatus onResponseBody(uint32_t body_length, bool end_of_stream) override;
FilterTrailersStatus onResponseTrailers(uint32_t trailers) override;
FilterMetadataStatus onResponseMetadata(uint32_t elements) override;
// Network
FilterStatus onNetworkNewConnection() override;
FilterStatus onDownstreamData(uint32_t data_length, bool end_of_stream) override;
FilterStatus onUpstreamData(uint32_t data_length, bool end_of_stream) override;
void onDownstreamConnectionClose(CloseType) override;
void onUpstreamConnectionClose(CloseType) override;
// Async call response.
void onHttpCallResponse(HttpCallToken token, uint32_t headers, uint32_t body_size,
uint32_t trailers) override;
// Grpc
void onGrpcReceiveInitialMetadata(GrpcToken token, uint32_t elements) override;
void onGrpcReceive(GrpcToken token, uint32_t response_size) override;
void onGrpcReceiveTrailingMetadata(GrpcToken token, uint32_t trailers) override;
void onGrpcClose(GrpcToken token, GrpcStatusCode status_code) override;
void error(std::string_view message) override {
std::cerr << message << "\n";
abort();
}
WasmResult unimplemented() override {
error("unimplemented proxy-wasm API");
return WasmResult::Unimplemented;
}
bool isFailed();
bool isFailOpen() { return plugin_->fail_open_; }
//
// General Callbacks.
//
WasmResult log(uint32_t /* level */, std::string_view /* message */) override {
return unimplemented();
}
uint32_t getLogLevel() override {
unimplemented();
return 0;
}
uint64_t getCurrentTimeNanoseconds() override {
unimplemented();
return 0;
}
uint64_t getMonotonicTimeNanoseconds() override {
unimplemented();
return 0;
}
std::string_view getConfiguration() override {
unimplemented();
return "";
}
std::pair<uint32_t, std::string_view> getStatus() override {
unimplemented();
return std::make_pair(1, "unimplmemented");
}
WasmResult setTimerPeriod(std::chrono::milliseconds period, uint32_t *timer_token_ptr) override;
// Buffer
BufferInterface *getBuffer(WasmBufferType /* type */) override {
unimplemented();
return nullptr;
}
bool endOfStream(WasmStreamType /* type */) override {
unimplemented();
return true;
}
// HTTP
WasmResult httpCall(std::string_view /* target */, const Pairs & /*request_headers */,
std::string_view /* request_body */, const Pairs & /* request_trailers */,
int /* timeout_millisconds */, uint32_t * /* token_ptr */) override {
return unimplemented();
}
// gRPC
WasmResult grpcCall(std::string_view /* grpc_service */, std::string_view /* service_name */,
std::string_view /* method_name */, const Pairs & /* initial_metadata */,
std::string_view /* request */, std::chrono::milliseconds /* timeout */,
GrpcToken * /* token_ptr */) override {
return unimplemented();
}
WasmResult grpcStream(std::string_view /* grpc_service */, std::string_view /* service_name */,
std::string_view /* method_name */, const Pairs & /* initial_metadata */,
GrpcToken * /* token_ptr */) override {
return unimplemented();
}
WasmResult grpcClose(uint32_t /* token */) override { // cancel on call, close on stream.
return unimplemented();
}
WasmResult grpcCancel(uint32_t /* token */) override { // cancel on call, reset on stream.
return unimplemented();
}
WasmResult grpcSend(uint32_t /* token */, std::string_view /* message */,
bool /* end_stream */) override { // stream only
return unimplemented();
}
// Metrics
WasmResult defineMetric(uint32_t /* type */, std::string_view /* name */,
uint32_t * /* metric_id_ptr */) override {
return unimplemented();
}
WasmResult incrementMetric(uint32_t /* metric_id */, int64_t /* offset */) override {
return unimplemented();
}
WasmResult recordMetric(uint32_t /* metric_id */, uint64_t /* value */) override {
return unimplemented();
}
WasmResult getMetric(uint32_t /* metric_id */, uint64_t * /* value_ptr */) override {
return unimplemented();
}
// Properties
WasmResult getProperty(std::string_view /* path */, std::string * /* result */) override {
return unimplemented();
}
WasmResult setProperty(std::string_view /* key */,
std::string_view /* serialized_value */) override {
return unimplemented();
}
// Continue
WasmResult continueStream(WasmStreamType /* stream_type */) override { return unimplemented(); }
WasmResult closeStream(WasmStreamType /* stream_type */) override { return unimplemented(); }
WasmResult sendLocalResponse(uint32_t /* response_code */, std::string_view /* body_text */,
Pairs /* additional_headers */, GrpcStatusCode /* grpc_status */,
std::string_view /* details */) override {
return unimplemented();
}
void clearRouteCache() override { unimplemented(); }
void failStream(WasmStreamType stream_type) override { closeStream(stream_type); }
// Shared Data
WasmResult getSharedData(std::string_view key,
std::pair<std::string, uint32_t /* cas */> *data) override;
WasmResult setSharedData(std::string_view key, std::string_view value, uint32_t cas) override;
WasmResult getSharedDataKeys(std::vector<std::string> *result) override;
WasmResult removeSharedDataKey(std::string_view key, uint32_t cas,
std::pair<std::string, uint32_t> *result) override;
// Shared Queue
WasmResult registerSharedQueue(std::string_view queue_name,
SharedQueueDequeueToken *token_ptr) override;
WasmResult lookupSharedQueue(std::string_view vm_id, std::string_view queue_name,
SharedQueueEnqueueToken *token_ptr) override;
WasmResult dequeueSharedQueue(uint32_t token, std::string *data) override;
WasmResult enqueueSharedQueue(uint32_t token, std::string_view value) override;
// Header/Trailer/Metadata Maps
WasmResult addHeaderMapValue(WasmHeaderMapType /* type */, std::string_view /* key */,
std::string_view /* value */) override {
return unimplemented();
}
WasmResult getHeaderMapValue(WasmHeaderMapType /* type */, std::string_view /* key */,
std::string_view * /*result */) override {
return unimplemented();
}
WasmResult getHeaderMapPairs(WasmHeaderMapType /* type */, Pairs * /* result */) override {
return unimplemented();
}
WasmResult setHeaderMapPairs(WasmHeaderMapType /* type */, const Pairs & /* pairs */) override {
return unimplemented();
}
WasmResult removeHeaderMapValue(WasmHeaderMapType /* type */,
std::string_view /* key */) override {
return unimplemented();
}
WasmResult replaceHeaderMapValue(WasmHeaderMapType /* type */, std::string_view /* key */,
std::string_view /* value */) override {
return unimplemented();
}
WasmResult getHeaderMapSize(WasmHeaderMapType /* type */, uint32_t * /* result */) override {
return unimplemented();
}
protected:
friend class WasmBase;
std::string makeRootLogPrefix(std::string_view vm_id) const;
WasmBase *wasm_{nullptr};
uint32_t id_{0};
uint32_t parent_context_id_{0}; // 0 for roots and the general context.
ContextBase *parent_context_{nullptr}; // set in all contexts.
std::string root_id_; // set only in root context.
std::string root_log_prefix_; // set only in root context.
std::shared_ptr<PluginBase> plugin_; // set in root and stream contexts.
std::shared_ptr<PluginHandleBase> plugin_handle_; // set only in stream context.
std::shared_ptr<PluginBase> temp_plugin_; // Remove once ABI v0.1.0 is gone.
bool in_vm_context_created_ = false;
bool destroyed_ = false;
bool stream_failed_ = false; // Set true after failStream is called in case of VM failure.
private:
// helper functions
FilterHeadersStatus convertVmCallResultToFilterHeadersStatus(uint64_t result);
FilterDataStatus convertVmCallResultToFilterDataStatus(uint64_t result);
FilterTrailersStatus convertVmCallResultToFilterTrailersStatus(uint64_t result);
FilterMetadataStatus convertVmCallResultToFilterMetadataStatus(uint64_t result);
};
class DeferAfterCallActions {
public:
DeferAfterCallActions(ContextBase *context) : wasm_(context->wasm()) {}
~DeferAfterCallActions();
private:
WasmBase *const wasm_;
};
uint32_t resolveQueueForTest(std::string_view vm_id, std::string_view queue_name);
} // namespace proxy_wasm