wangle/example/file/FileServer.cpp (77 lines of code) (raw):
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/portability/GFlags.h>
#include <folly/init/Init.h>
#include <wangle/bootstrap/ServerBootstrap.h>
#include <wangle/channel/AsyncSocketHandler.h>
#include <wangle/channel/FileRegion.h>
#include <wangle/codec/LineBasedFrameDecoder.h>
#include <wangle/codec/StringCodec.h>
using namespace folly;
using namespace wangle;
DEFINE_int32(port, 11219, "test file server port");
typedef Pipeline<IOBufQueue&, std::string> FileServerPipeline;
class FileServerHandler : public HandlerAdapter<std::string> {
public:
void read(Context* ctx, std::string filename) override {
if (filename == "bye") {
close(ctx);
}
int fd = open(filename.c_str(), O_RDONLY);
if (fd == -1) {
write(
ctx, sformat("Error opening {}: {}\r\n", filename, strerror(errno)));
return;
}
struct stat buf;
if (fstat(fd, &buf) == -1) {
write(
ctx,
sformat("Could not stat file {}: {}\r\n", filename, strerror(errno)));
return;
}
FileRegion fileRegion(fd, 0, buf.st_size);
auto guard = ctx->getPipelineShared();
fileRegion.transferTo(ctx->getTransport())
.thenError(
folly::tag_t<std::exception>{},
[this, guard, ctx, filename](const std::exception& e) {
write(
ctx,
sformat(
"Error sending file {}: {}\r\n",
filename,
exceptionStr(e)));
});
}
void readException(Context* ctx, exception_wrapper ew) override {
write(ctx, sformat("Error: {}\r\n", exceptionStr(ew)))
.thenValue([this, ctx](auto&&) { close(ctx); });
}
void transportActive(Context* ctx) override {
SocketAddress localAddress;
ctx->getTransport()->getLocalAddress(&localAddress);
write(ctx, "Welcome to " + localAddress.describe() + "!\r\n");
write(ctx, "Type the name of a file and it will be streamed to you!\r\n");
write(ctx, "Type 'bye' to exit.\r\n");
}
};
class FileServerPipelineFactory : public PipelineFactory<FileServerPipeline> {
public:
FileServerPipeline::Ptr newPipeline(
std::shared_ptr<AsyncTransport> sock) override {
auto pipeline = FileServerPipeline::create();
pipeline->addBack(AsyncSocketHandler(sock));
pipeline->addBack(LineBasedFrameDecoder());
pipeline->addBack(StringCodec());
pipeline->addBack(FileServerHandler());
pipeline->finalize();
return pipeline;
}
};
int main(int argc, char** argv) {
folly::Init init(&argc, &argv);
ServerBootstrap<FileServerPipeline> server;
server.childPipeline(std::make_shared<FileServerPipelineFactory>());
server.bind(FLAGS_port);
server.waitForStop();
return 0;
}