demo_example/ReadFiles.cpp (118 lines of code) (raw):
/*
* Copyright (c) 2022, Alibaba Group Holding Limited;
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <algorithm>
#include <fstream>
#include <iostream>
#include <memory>
#include <string>
#include <vector>
#include "async_simple/Future.h"
#include "async_simple/Promise.h"
#include "async_simple/coro/Lazy.h"
#include "async_simple/coro/SyncAwait.h"
#include "async_simple/executors/SimpleExecutor.h"
using namespace async_simple;
using namespace async_simple::coro;
using FileName = std::string;
/////////////////////////////////////////
/////////// Synchronous Part ////////////
/////////////////////////////////////////
// The *Impl is not the key point for this example, code readers
// could ignore this.
uint64_t CountCharInFileImpl(const FileName &File, char c) {
uint64_t Ret = 0;
std::ifstream infile(File);
std::string line;
while (std::getline(infile, line))
Ret += std::count(line.begin(), line.end(), c);
return Ret;
}
uint64_t CountCharInFiles(const std::vector<FileName> &Files, char c) {
uint64_t ReadSize = 0;
for (const auto &File : Files)
ReadSize += CountCharInFileImpl(File, c);
return ReadSize;
}
/////////////////////////////////////////
/////////// Asynchronous Part ///////////
/////////////////////////////////////////
template <class T>
concept Range = requires(T &t) {
t.begin();
t.end();
};
// It is not travial to implement an asynchronous do_for_each.
template <typename InputIt, typename Callable>
Future<void> do_for_each(InputIt Begin, InputIt End, Callable func) {
for (auto It = Begin; It != End; ++It) {
auto F = std::invoke(func, *It);
// If we met an error, return early.
if (F.result().hasError())
return F;
if (F.hasResult())
continue;
return std::move(F).thenTry(
[SubBegin = ++It, SubEnd = End, func](auto &&) {
return do_for_each(SubBegin, SubEnd, func);
});
}
return makeReadyFuture();
}
template <Range RangeTy, typename Callable>
Future<void> do_for_each(const RangeTy &Rng, Callable func) {
return do_for_each(Rng.begin(), Rng.end(), func);
}
// The *Impl is not the key point for this example, code readers
// could ignore this.
Future<uint64_t> CountCharInFileAsyncImpl(const FileName &File, char c) {
uint64_t Ret = 0;
std::ifstream infile(File);
std::string line;
while (std::getline(infile, line))
Ret += std::count(line.begin(), line.end(), c);
return makeReadyFuture(std::move(Ret));
}
Future<uint64_t> CountCharInFilesAsync(const std::vector<FileName> &Files,
char c) {
// std::shared_ptr is necessary here. Since ReadSize may be used after
// CountCharInFilesAsync function ends.
auto ReadSize = std::make_shared<uint64_t>(0);
// We need to introduce another API `do_for_each` here.
return do_for_each(std::move(Files),
[ReadSize, c](auto &&File) {
return CountCharInFileAsyncImpl(File, c).thenValue(
[ReadSize](auto &&Size) {
*ReadSize += Size;
return makeReadyFuture();
});
})
.thenTry([ReadSize](auto &&) {
return makeReadyFuture<uint64_t>(*ReadSize);
});
;
}
/////////////////////////////////////////
/////////// Coroutine Part //////////////
/////////////////////////////////////////
// The *Impl is not the key point for this example, code readers
// could ignore this.
Lazy<uint64_t> CountCharFileCoroImpl(const FileName &File, char c) {
uint64_t Ret = 0;
std::ifstream infile(File);
std::string line;
while (std::getline(infile, line))
Ret += std::count(line.begin(), line.end(), c);
co_return Ret;
}
Lazy<uint64_t> CountCharInFilesCoro(const std::vector<FileName> &Files,
char c) {
uint64_t ReadSize = 0;
for (const auto &File : Files)
ReadSize += co_await CountCharFileCoroImpl(File, c);
co_return ReadSize;
}
int main() {
std::vector<FileName> Files = {
"demo_example/Input/1.txt", "demo_example/Input/2.txt",
"demo_example/Input/3.txt", "demo_example/Input/4.txt",
"demo_example/Input/5.txt", "demo_example/Input/6.txt",
"demo_example/Input/7.txt", "demo_example/Input/8.txt",
"demo_example/Input/9.txt", "demo_example/Input/10.txt",
};
std::cout << "Calculating char counts synchronously.\n";
auto ResSync = CountCharInFiles(Files, 'x');
std::cout << "Files contain " << ResSync << " 'x' chars.\n";
executors::SimpleExecutor executor(4);
Promise<Unit> p;
auto future = p.getFuture();
std::cout << "Calculating char counts asynchronously by future.\n";
auto f = std::move(future).via(&executor).thenTry([&Files](auto &&) {
return CountCharInFilesAsync(Files, 'x').thenValue([](auto &&Value) {
std::cout << "Files contain " << Value << " 'x' chars.\n";
});
});
p.setValue(Unit());
f.wait();
std::cout << "Calculating char counts asynchronously by coroutine.\n";
auto ResCoro = syncAwait(CountCharInFilesCoro(Files, 'x'), &executor);
std::cout << "Files contain " << ResCoro << " 'x' chars.\n";
return 0;
}