origin/blobserver/server.go (622 lines of code) (raw):
// Copyright (c) 2016-2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package blobserver
import (
"encoding/json"
"fmt"
"io"
"net/http"
_ "net/http/pprof" // Registers /debug/pprof endpoints in http.DefaultServeMux.
"os"
"strconv"
"strings"
"sync"
"time"
"github.com/uber/kraken/core"
"github.com/uber/kraken/lib/backend"
"github.com/uber/kraken/lib/backend/backenderrors"
"github.com/uber/kraken/lib/blobrefresh"
"github.com/uber/kraken/lib/hashring"
"github.com/uber/kraken/lib/metainfogen"
"github.com/uber/kraken/lib/middleware"
"github.com/uber/kraken/lib/persistedretry"
"github.com/uber/kraken/lib/persistedretry/writeback"
"github.com/uber/kraken/lib/store"
"github.com/uber/kraken/lib/store/metadata"
"github.com/uber/kraken/origin/blobclient"
"github.com/uber/kraken/utils/errutil"
"github.com/uber/kraken/utils/handler"
"github.com/uber/kraken/utils/httputil"
"github.com/uber/kraken/utils/listener"
"github.com/uber/kraken/utils/log"
"github.com/uber/kraken/utils/memsize"
"github.com/uber/kraken/utils/stringset"
"github.com/andres-erbsen/clock"
"github.com/go-chi/chi"
"github.com/uber-go/tally"
)
const _uploadChunkSize = 16 * memsize.MB
// Server defines a server that serves blob data for agent.
type Server struct {
config Config
stats tally.Scope
clk clock.Clock
addr string
hashRing hashring.Ring
cas *store.CAStore
clientProvider blobclient.Provider
clusterProvider blobclient.ClusterProvider
backends *backend.Manager
blobRefresher *blobrefresh.Refresher
metaInfoGenerator *metainfogen.Generator
uploader *uploader
writeBackManager persistedretry.Manager
// This is an unfortunate coupling between the p2p client and the blob server.
// Tracker queries the origin cluster to discover which origins can seed
// a given torrent, however this requires blob server to understand the
// context of the p2p client running alongside it.
pctx core.PeerContext
}
// New initializes a new Server.
func New(
config Config,
stats tally.Scope,
clk clock.Clock,
addr string,
hashRing hashring.Ring,
cas *store.CAStore,
clientProvider blobclient.Provider,
clusterProvider blobclient.ClusterProvider,
pctx core.PeerContext,
backends *backend.Manager,
blobRefresher *blobrefresh.Refresher,
metaInfoGenerator *metainfogen.Generator,
writeBackManager persistedretry.Manager) (*Server, error) {
config = config.applyDefaults()
stats = stats.Tagged(map[string]string{
"module": "blobserver",
})
return &Server{
config: config,
stats: stats,
clk: clk,
addr: addr,
hashRing: hashRing,
cas: cas,
clientProvider: clientProvider,
clusterProvider: clusterProvider,
backends: backends,
blobRefresher: blobRefresher,
metaInfoGenerator: metaInfoGenerator,
uploader: newUploader(cas),
writeBackManager: writeBackManager,
pctx: pctx,
}, nil
}
// Addr returns the address the blob server is configured on.
func (s *Server) Addr() string {
return s.addr
}
// Handler returns an http handler for the blob server.
func (s *Server) Handler() http.Handler {
r := chi.NewRouter()
r.Use(middleware.StatusCounter(s.stats))
r.Use(middleware.LatencyTimer(s.stats))
// Public endpoints:
r.Get("/health", handler.Wrap(s.healthCheckHandler))
r.Get("/readiness", handler.Wrap(s.readinessCheckHandler))
r.Get("/blobs/{digest}/locations", handler.Wrap(s.getLocationsHandler))
r.Post("/namespace/{namespace}/blobs/{digest}/uploads", handler.Wrap(s.startClusterUploadHandler))
r.Patch("/namespace/{namespace}/blobs/{digest}/uploads/{uid}", handler.Wrap(s.patchClusterUploadHandler))
r.Put("/namespace/{namespace}/blobs/{digest}/uploads/{uid}", handler.Wrap(s.commitClusterUploadHandler))
r.Get("/namespace/{namespace}/blobs/{digest}", handler.Wrap(s.downloadBlobHandler))
r.Post("/namespace/{namespace}/blobs/{digest}/remote/{remote}", handler.Wrap(s.replicateToRemoteHandler))
r.Post("/forcecleanup", handler.Wrap(s.forceCleanupHandler))
// Internal endpoints:
r.Post("/internal/blobs/{digest}/uploads", handler.Wrap(s.startTransferHandler))
r.Patch("/internal/blobs/{digest}/uploads/{uid}", handler.Wrap(s.patchTransferHandler))
r.Put("/internal/blobs/{digest}/uploads/{uid}", handler.Wrap(s.commitTransferHandler))
r.Delete("/internal/blobs/{digest}", handler.Wrap(s.deleteBlobHandler))
r.Post("/internal/blobs/{digest}/metainfo", handler.Wrap(s.overwriteMetaInfoHandler))
r.Get("/internal/peercontext", handler.Wrap(s.getPeerContextHandler))
r.Head("/internal/namespace/{namespace}/blobs/{digest}", handler.Wrap(s.statHandler))
r.Get("/internal/namespace/{namespace}/blobs/{digest}/metainfo", handler.Wrap(s.getMetaInfoHandler))
r.Put(
"/internal/duplicate/namespace/{namespace}/blobs/{digest}/uploads/{uid}",
handler.Wrap(s.duplicateCommitClusterUploadHandler))
r.Mount("/", http.DefaultServeMux) // Serves /debug/pprof endpoints.
return r
}
// ListenAndServe is a blocking call which runs s.
func (s *Server) ListenAndServe(h http.Handler) error {
log.Infof("Starting blob server on %s", s.config.Listener)
return listener.Serve(s.config.Listener, h)
}
func (s *Server) healthCheckHandler(w http.ResponseWriter, r *http.Request) error {
fmt.Fprintln(w, "OK")
return nil
}
func (s *Server) readinessCheckHandler(w http.ResponseWriter, r *http.Request) error {
err := s.backends.CheckReadiness()
if err != nil {
return handler.Errorf("not ready to serve traffic: %s", err).Status(http.StatusServiceUnavailable)
}
fmt.Fprintln(w, "OK")
return nil
}
// statHandler returns blob info if it exists.
func (s *Server) statHandler(w http.ResponseWriter, r *http.Request) error {
checkLocal, err := strconv.ParseBool(httputil.GetQueryArg(r, "local", "false"))
if err != nil {
return handler.Errorf("parse arg `local` as bool: %s", err)
}
namespace, err := httputil.ParseParam(r, "namespace")
if err != nil {
return err
}
d, err := httputil.ParseDigest(r, "digest")
if err != nil {
return err
}
bi, err := s.stat(namespace, d, checkLocal)
if os.IsNotExist(err) {
return handler.ErrorStatus(http.StatusNotFound)
} else if err != nil {
return fmt.Errorf("stat: %s", err)
}
w.Header().Set("Content-Length", strconv.FormatInt(bi.Size, 10))
log.Debugf("successfully check blob %s exists", d.Hex())
return nil
}
func (s *Server) stat(namespace string, d core.Digest, checkLocal bool) (*core.BlobInfo, error) {
fi, err := s.cas.GetCacheFileStat(d.Hex())
if err == nil {
return core.NewBlobInfo(fi.Size()), nil
} else if os.IsNotExist(err) {
if !checkLocal {
client, err := s.backends.GetClient(namespace)
if err != nil {
return nil, fmt.Errorf("get backend client: %s", err)
}
if bi, err := client.Stat(namespace, d.Hex()); err == nil {
return bi, nil
} else if err == backenderrors.ErrBlobNotFound {
return nil, os.ErrNotExist
} else {
return nil, fmt.Errorf("backend stat: %s", err)
}
}
return nil, err // os.ErrNotExist
}
return nil, fmt.Errorf("stat cache file: %s", err)
}
func (s *Server) downloadBlobHandler(w http.ResponseWriter, r *http.Request) error {
namespace, err := httputil.ParseParam(r, "namespace")
if err != nil {
return err
}
d, err := httputil.ParseDigest(r, "digest")
if err != nil {
return err
}
if err := s.downloadBlob(namespace, d, w); err != nil {
log.With("namespace", namespace).Errorf("Error downloading blob: %s", err)
return err
}
setOctetStreamContentType(w)
return nil
}
func (s *Server) replicateToRemoteHandler(w http.ResponseWriter, r *http.Request) error {
namespace, err := httputil.ParseParam(r, "namespace")
if err != nil {
return err
}
d, err := httputil.ParseDigest(r, "digest")
if err != nil {
return err
}
remote, err := httputil.ParseParam(r, "remote")
if err != nil {
return err
}
return s.replicateToRemote(namespace, d, remote)
}
func (s *Server) replicateToRemote(namespace string, d core.Digest, remoteDNS string) error {
f, err := s.cas.GetCacheFileReader(d.Hex())
if err != nil {
if os.IsNotExist(err) {
return s.startRemoteBlobDownload(namespace, d, false)
}
return handler.Errorf("file store: %s", err)
}
defer f.Close()
remote, err := s.clusterProvider.Provide(remoteDNS)
if err != nil {
return handler.Errorf("remote cluster provider: %s", err)
}
return remote.UploadBlob(namespace, d, f)
}
// deleteBlobHandler deletes blob data.
func (s *Server) deleteBlobHandler(w http.ResponseWriter, r *http.Request) error {
d, err := httputil.ParseDigest(r, "digest")
if err != nil {
return err
}
if err := s.deleteBlob(d); err != nil {
return err
}
setContentLength(w, 0)
w.WriteHeader(http.StatusAccepted)
log.Debugf("successfully delete blob %s", d.Hex())
return nil
}
func (s *Server) getLocationsHandler(w http.ResponseWriter, r *http.Request) error {
d, err := httputil.ParseDigest(r, "digest")
if err != nil {
return err
}
locs := s.hashRing.Locations(d)
w.Header().Set("Origin-Locations", strings.Join(locs, ","))
w.WriteHeader(http.StatusOK)
return nil
}
// getPeerContextHandler returns the Server's peer context as JSON.
func (s *Server) getPeerContextHandler(w http.ResponseWriter, r *http.Request) error {
if err := json.NewEncoder(w).Encode(s.pctx); err != nil {
return handler.Errorf("error converting peer context to json: %s", err)
}
return nil
}
func (s *Server) getMetaInfoHandler(w http.ResponseWriter, r *http.Request) error {
namespace, err := httputil.ParseParam(r, "namespace")
if err != nil {
return err
}
d, err := httputil.ParseDigest(r, "digest")
if err != nil {
return err
}
raw, err := s.getMetaInfo(namespace, d)
if err != nil {
return err
}
w.Write(raw)
return nil
}
func (s *Server) overwriteMetaInfoHandler(w http.ResponseWriter, r *http.Request) error {
d, err := httputil.ParseDigest(r, "digest")
if err != nil {
return err
}
pieceLength, err := strconv.ParseInt(r.URL.Query().Get("piece_length"), 10, 64)
if err != nil {
return handler.Errorf("invalid piece_length argument: %s", err).Status(http.StatusBadRequest)
}
return s.overwriteMetaInfo(d, pieceLength)
}
// overwriteMetaInfo generates metainfo configured with pieceLength for d and
// writes it to disk, overwriting any existing metainfo. Primarily intended for
// benchmarking purposes.
func (s *Server) overwriteMetaInfo(d core.Digest, pieceLength int64) error {
f, err := s.cas.GetCacheFileReader(d.Hex())
if err != nil {
return handler.Errorf("get cache file: %s", err)
}
mi, err := core.NewMetaInfo(d, f, pieceLength)
if err != nil {
return handler.Errorf("create metainfo: %s", err)
}
if _, err := s.cas.SetCacheFileMetadata(d.Hex(), metadata.NewTorrentMeta(mi)); err != nil {
return handler.Errorf("set metainfo: %s", err)
}
return nil
}
// getMetaInfo returns metainfo for d. If no blob exists under d, a download of
// the blob from the storage backend configured for namespace will be initiated.
// This download is asynchronous and getMetaInfo will immediately return a
// "202 Accepted" server error.
func (s *Server) getMetaInfo(namespace string, d core.Digest) ([]byte, error) {
var tm metadata.TorrentMeta
if err := s.cas.GetCacheFileMetadata(d.Hex(), &tm); os.IsNotExist(err) {
return nil, s.startRemoteBlobDownload(namespace, d, true)
} else if err != nil {
return nil, handler.Errorf("get cache metadata: %s", err)
}
return tm.Serialize()
}
type localReplicationHook struct {
server *Server
}
func (h *localReplicationHook) Run(d core.Digest) {
timer := h.server.stats.Timer("replicate_blob").Start()
if err := h.server.replicateBlobLocally(d); err != nil {
// Don't return error here as we only want to cache storage backend errors.
log.With("blob", d.Hex()).Errorf("Error replicating remote blob: %s", err)
h.server.stats.Counter("replicate_blob_errors").Inc(1)
return
}
timer.Stop()
}
func (s *Server) startRemoteBlobDownload(
namespace string, d core.Digest, replicateLocally bool) error {
var hooks []blobrefresh.PostHook
if replicateLocally {
hooks = append(hooks, &localReplicationHook{s})
}
err := s.blobRefresher.Refresh(namespace, d, hooks...)
switch err {
case blobrefresh.ErrPending, nil:
return handler.ErrorStatus(http.StatusAccepted)
case blobrefresh.ErrNotFound:
return handler.ErrorStatus(http.StatusNotFound)
case blobrefresh.ErrWorkersBusy:
return handler.ErrorStatus(http.StatusServiceUnavailable)
default:
return err
}
}
func (s *Server) replicateBlobLocally(d core.Digest) error {
return s.applyToReplicas(d, func(i int, client blobclient.Client) error {
f, err := s.cas.GetCacheFileReader(d.Hex())
if err != nil {
return fmt.Errorf("get cache reader: %s", err)
}
if err := client.TransferBlob(d, f); err != nil {
return fmt.Errorf("transfer blob: %s", err)
}
return nil
})
}
// applyToReplicas applies f to the replicas of d concurrently in random order,
// not including the current origin. Passes the index of the iteration to f.
func (s *Server) applyToReplicas(d core.Digest, f func(i int, c blobclient.Client) error) error {
replicas := stringset.FromSlice(s.hashRing.Locations(d))
replicas.Remove(s.addr)
var mu sync.Mutex
var errs []error
var wg sync.WaitGroup
var i int
for replica := range replicas {
wg.Add(1)
go func(i int, replica string) {
defer wg.Done()
if err := f(i, s.clientProvider.Provide(replica)); err != nil {
mu.Lock()
errs = append(errs, err)
mu.Unlock()
}
}(i, replica)
i++
}
wg.Wait()
return errutil.Join(errs)
}
// downloadBlob downloads blob for d into dst. If no blob exists under d, a
// download of the blob from the storage backend configured for namespace will
// be initiated. This download is asynchronous and downloadBlob will immediately
// return a "202 Accepted" handler error.
func (s *Server) downloadBlob(namespace string, d core.Digest, dst io.Writer) error {
f, err := s.cas.GetCacheFileReader(d.Hex())
if os.IsNotExist(err) {
return s.startRemoteBlobDownload(namespace, d, true)
} else if err != nil {
return handler.Errorf("get cache file: %s", err)
}
defer f.Close()
if _, err := io.Copy(dst, f); err != nil {
return handler.Errorf("copy blob: %s", err)
}
return nil
}
func (s *Server) deleteBlob(d core.Digest) error {
if err := s.cas.DeleteCacheFile(d.Hex()); err != nil {
if os.IsNotExist(err) {
return handler.ErrorStatus(http.StatusNotFound)
}
return handler.Errorf("cannot delete blob data for digest %q: %s", d, err)
}
return nil
}
// startTransferHandler initializes an upload for internal blob transfers.
func (s *Server) startTransferHandler(w http.ResponseWriter, r *http.Request) error {
d, err := httputil.ParseDigest(r, "digest")
if err != nil {
return err
}
if ok, err := blobExists(s.cas, d); err != nil {
return handler.Errorf("check blob: %s", err)
} else if ok {
return handler.ErrorStatus(http.StatusConflict)
}
uid, err := s.uploader.start(d)
if err != nil {
return err
}
setUploadLocation(w, uid)
w.WriteHeader(http.StatusOK)
return nil
}
// patchTransferHandler uploads a chunk of a blob for internal uploads.
func (s *Server) patchTransferHandler(w http.ResponseWriter, r *http.Request) error {
d, err := httputil.ParseDigest(r, "digest")
if err != nil {
return err
}
uid, err := httputil.ParseParam(r, "uid")
if err != nil {
return err
}
start, end, err := parseContentRange(r.Header)
if err != nil {
return err
}
return s.uploader.patch(d, uid, r.Body, start, end)
}
// commitTransferHandler commits the upload of an internal blob transfer.
// Internal blob transfers are not replicated to the rest of the cluster.
func (s *Server) commitTransferHandler(w http.ResponseWriter, r *http.Request) error {
d, err := httputil.ParseDigest(r, "digest")
if err != nil {
return err
}
uid, err := httputil.ParseParam(r, "uid")
if err != nil {
return err
}
if err := s.uploader.commit(d, uid); err != nil {
return err
}
if err := s.metaInfoGenerator.Generate(d); err != nil {
return handler.Errorf("generate metainfo: %s", err)
}
return nil
}
func (s *Server) handleUploadConflict(err error, namespace string, d core.Digest) error {
if herr, ok := err.(*handler.Error); ok && herr.GetStatus() == http.StatusConflict {
// Even if the blob was already uploaded and committed to cache, it's
// still possible that adding the write-back task failed. Clients short
// circuit on conflict and return success, so we must make sure that if we
// tell a client to stop before commit, the blob has been written back.
if err := s.writeBack(namespace, d, 0); err != nil {
return err
}
}
return err
}
// startClusterUploadHandler initializes an upload for external uploads.
func (s *Server) startClusterUploadHandler(w http.ResponseWriter, r *http.Request) error {
d, err := httputil.ParseDigest(r, "digest")
if err != nil {
return err
}
namespace, err := httputil.ParseParam(r, "namespace")
if err != nil {
return err
}
uid, err := s.uploader.start(d)
if err != nil {
return s.handleUploadConflict(err, namespace, d)
}
setUploadLocation(w, uid)
w.WriteHeader(http.StatusOK)
return nil
}
// patchClusterUploadHandler uploads a chunk of a blob for external uploads.
func (s *Server) patchClusterUploadHandler(w http.ResponseWriter, r *http.Request) error {
d, err := httputil.ParseDigest(r, "digest")
if err != nil {
return err
}
namespace, err := httputil.ParseParam(r, "namespace")
if err != nil {
return err
}
uid, err := httputil.ParseParam(r, "uid")
if err != nil {
return err
}
start, end, err := parseContentRange(r.Header)
if err != nil {
return err
}
if err := s.uploader.patch(d, uid, r.Body, start, end); err != nil {
return s.handleUploadConflict(err, namespace, d)
}
return nil
}
// commitClusterUploadHandler commits an external blob upload asynchronously,
// meaning the blob will be written back to remote storage in a non-blocking
// fashion.
func (s *Server) commitClusterUploadHandler(w http.ResponseWriter, r *http.Request) error {
d, err := httputil.ParseDigest(r, "digest")
if err != nil {
return err
}
namespace, err := httputil.ParseParam(r, "namespace")
if err != nil {
return err
}
uid, err := httputil.ParseParam(r, "uid")
if err != nil {
return err
}
if err := s.uploader.commit(d, uid); err != nil {
return s.handleUploadConflict(err, namespace, d)
}
if err := s.writeBack(namespace, d, 0); err != nil {
return err
}
err = s.applyToReplicas(d, func(i int, client blobclient.Client) error {
delay := s.config.DuplicateWriteBackStagger * time.Duration(i+1)
f, err := s.cas.GetCacheFileReader(d.Hex())
if err != nil {
return fmt.Errorf("get cache file: %s", err)
}
if err := client.DuplicateUploadBlob(namespace, d, f, delay); err != nil {
return fmt.Errorf("duplicate upload: %s", err)
}
return nil
})
if err != nil {
s.stats.Counter("duplicate_write_back_errors").Inc(1)
log.Errorf("Error duplicating write-back task to replicas: %s", err)
}
return nil
}
// duplicateCommitClusterUploadHandler commits a duplicate blob upload, which
// will attempt to write-back after the requested delay.
func (s *Server) duplicateCommitClusterUploadHandler(w http.ResponseWriter, r *http.Request) error {
d, err := httputil.ParseDigest(r, "digest")
if err != nil {
return err
}
namespace, err := httputil.ParseParam(r, "namespace")
if err != nil {
return err
}
uid, err := httputil.ParseParam(r, "uid")
if err != nil {
return err
}
var dr blobclient.DuplicateCommitUploadRequest
if err := json.NewDecoder(r.Body).Decode(&dr); err != nil {
return handler.Errorf("decode body: %s", err)
}
delay := dr.Delay
if err := s.uploader.commit(d, uid); err != nil {
return err
}
return s.writeBack(namespace, d, delay)
}
func (s *Server) writeBack(namespace string, d core.Digest, delay time.Duration) error {
if _, err := s.cas.SetCacheFileMetadata(d.Hex(), metadata.NewPersist(true)); err != nil {
return handler.Errorf("set persist metadata: %s", err)
}
task := writeback.NewTask(namespace, d.Hex(), delay)
if err := s.writeBackManager.Add(task); err != nil {
return handler.Errorf("add write-back task: %s", err)
}
if err := s.metaInfoGenerator.Generate(d); err != nil {
return handler.Errorf("generate metainfo: %s", err)
}
return nil
}
func (s *Server) forceCleanupHandler(w http.ResponseWriter, r *http.Request) error {
// Note, this API is intended to be executed manually (i.e. curl), hence the
// query arguments, usage of hours instead of nanoseconds, and JSON response
// enumerating deleted files / errors.
rawTTLHr := r.URL.Query().Get("ttl_hr")
if rawTTLHr == "" {
return handler.Errorf("query arg ttl_hr required").Status(http.StatusBadRequest)
}
ttlHr, err := strconv.Atoi(rawTTLHr)
if err != nil {
return handler.Errorf("invalid ttl_hr: %s", err).Status(http.StatusBadRequest)
}
ttl := time.Duration(ttlHr) * time.Hour
names, err := s.cas.ListCacheFiles()
if err != nil {
return err
}
var errs, deleted []string
for _, name := range names {
if ok, err := s.maybeDelete(name, ttl); err != nil {
errs = append(errs, fmt.Sprintf("%s: %s", name, err))
} else if ok {
deleted = append(deleted, name)
}
}
return json.NewEncoder(w).Encode(map[string]interface{}{
"deleted": deleted,
"errors": errs,
})
}
func (s *Server) maybeDelete(name string, ttl time.Duration) (deleted bool, err error) {
d, err := core.NewSHA256DigestFromHex(name)
if err != nil {
return false, fmt.Errorf("parse digest: %s", err)
}
info, err := s.cas.GetCacheFileStat(name)
if err != nil {
return false, fmt.Errorf("store: %s", err)
}
expired := s.clk.Now().Sub(info.ModTime()) > ttl
owns := stringset.FromSlice(s.hashRing.Locations(d)).Has(s.addr)
if expired || !owns {
// Ensure file is backed up properly before deleting.
var pm metadata.Persist
if err := s.cas.GetCacheFileMetadata(name, &pm); err != nil && !os.IsNotExist(err) {
return false, fmt.Errorf("store: %s", err)
}
if pm.Value {
// Note: It is possible that no writeback tasks exist, but the file
// is persisted. We classify this as a leaked file which is safe to
// delete.
tasks, err := s.writeBackManager.Find(writeback.NewNameQuery(name))
if err != nil {
return false, fmt.Errorf("find writeback tasks: %s", err)
}
for _, task := range tasks {
if err := s.writeBackManager.SyncExec(task); err != nil {
return false, fmt.Errorf("writeback: %s", err)
}
}
if err := s.cas.DeleteCacheFileMetadata(name, &metadata.Persist{}); err != nil {
return false, fmt.Errorf("delete persist: %s", err)
}
}
if err := s.cas.DeleteCacheFile(name); err != nil {
return false, fmt.Errorf("delete: %s", err)
}
return true, nil
}
return false, nil
}