cmd/pinpi/main.go (136 lines of code) (raw):
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"bytes"
"context"
"flag"
"fmt"
"io"
"os"
"sync"
"time"
"github.com/googlecloudplatform/pi-delivery/gen/index"
"github.com/googlecloudplatform/pi-delivery/pkg/obj"
"github.com/googlecloudplatform/pi-delivery/pkg/obj/gcs"
"github.com/googlecloudplatform/pi-delivery/pkg/unpack"
"github.com/sethvargo/go-retry"
"go.uber.org/zap"
)
const (
TOTAL_NUMBERS = 1_000_000_00
DIGITS_PER_NUMBER = 8
CHUNK_SIZE = 100_000_000
WORKERS = 256
SEQUENCE = "3141592653589793238462643383279502884197169399375105820974944592307816406286208998628034825342117067"
MIN_MATCH = 10
)
var logger *zap.SugaredLogger
var wg sync.WaitGroup
type workerContextKey string
type task struct {
start int64
n int64
cancel context.CancelFunc
}
func process(ctx context.Context, task *task, logger *zap.SugaredLogger, client obj.Client) error {
logger.Infof("processing task, start = %d, n = %v", task.start, task.n)
rrd := index.Decimal.NewReader(ctx, client.Bucket(index.BucketName))
defer rrd.Close()
urd := unpack.NewReader(ctx, rrd)
if _, err := urd.Seek(task.start, io.SeekStart); err != nil {
return err
}
buf := make([]byte, task.n)
n, err := io.ReadFull(urd, buf)
if n < MIN_MATCH {
return err
}
buf = buf[:n]
off := 0
for len(buf) >= MIN_MATCH {
i := bytes.Index(buf, []byte(SEQUENCE[:MIN_MATCH]))
if i < 0 {
break
}
match := buf[i:]
if len(match) < len(SEQUENCE) {
p := make([]byte, len(SEQUENCE)-len(match))
if n, err := io.ReadFull(urd, p); n == 0 {
return err
}
match = append(match, p...)
}
l := 0
for ; l < len(SEQUENCE)-MIN_MATCH; l++ {
if match[MIN_MATCH+l] != SEQUENCE[MIN_MATCH+l] {
break
}
}
matchLen := MIN_MATCH + l
fmt.Printf("%v, %v, %s\n",
task.start+int64(off+i+1), matchLen, string(match[:matchLen]))
buf = buf[i+matchLen:]
off += i + matchLen
}
logger.Infof("digits processed: %d + %d digits",
task.start, task.n)
return nil
}
func worker(ctx context.Context, taskChan <-chan task, client obj.Client) {
defer wg.Done()
logger := logger.With("worker id", ctx.Value(workerContextKey("workerId")))
defer logger.Sync()
defer logger.Infow("worker exiting")
logger.Info("worker started")
b := retry.WithMaxRetries(3, retry.NewExponential(1*time.Second))
for task := range taskChan {
select {
case <-ctx.Done():
return
default:
}
if err := retry.Do(ctx, b, func(ctx context.Context) error {
if err := process(ctx, &task, logger, client); err != nil {
return retry.RetryableError(err)
}
return nil
}); err != nil {
logger.Errorw("process failed", "error", err)
task.cancel()
}
}
}
func main() {
l, _ := zap.NewDevelopment()
defer l.Sync()
zap.ReplaceGlobals(l)
logger = l.Sugar()
start := flag.Int64("s", 0, "Start offset")
flag.Parse()
ctx, cancel := context.WithCancel(context.Background())
client, err := gcs.NewClient(ctx)
if err != nil {
logger.Errorf("couldn't create a GCS client: %v", err)
os.Exit(1)
}
defer client.Close()
taskChan := make(chan task, 256)
for i := 0; i < WORKERS; i++ {
wg.Add(1)
ctx = context.WithValue(ctx, workerContextKey("workerId"), i)
go worker(ctx, taskChan, client)
}
for i := *start; i < index.Decimal.TotalDigits(); i += CHUNK_SIZE {
task := task{
start: i,
n: CHUNK_SIZE,
cancel: cancel,
}
taskChan <- task
if ctx.Err() != nil {
logger.Errorf("context error: %v", ctx.Err())
break
}
}
close(taskChan)
wg.Wait()
}