in cmd/gitaly-lfs-smudge/smudge.go [66:288]
func process(ctx context.Context, cfg smudge.Config, to io.Writer, from io.Reader, logger log.Logger) error {
client, err := gitlab.NewHTTPClient(logger, cfg.Gitlab, cfg.TLS, prometheus.Config{})
if err != nil {
return fmt.Errorf("creating HTTP client: %w", err)
}
scanner := pktline.NewScanner(from)
writer := bufio.NewWriter(to)
buf := make([]byte, pktline.MaxPktSize-4)
var content bytes.Buffer
clientSupportsVersion2 := false
clientSupportsSmudgeCapability := false
state := processStateAnnounce
for scanner.Scan() {
line := scanner.Bytes()
var data []byte
if !pktline.IsFlush(line) {
payload, err := pktline.Payload(line)
if err != nil {
return fmt.Errorf("getting payload: %w", err)
}
data = payload
}
switch state {
case processStateAnnounce:
if !bytes.Equal(data, []byte("git-filter-client\n")) {
return fmt.Errorf("invalid client %q", string(data))
}
state = processStateVersions
case processStateVersions:
// The client will announce one or more supported versions to us. We need to
// collect them all in order to determine whether we do in fact support one
// of the announced versions.
if !pktline.IsFlush(line) {
if !bytes.HasPrefix(data, []byte("version=")) {
return fmt.Errorf("expected version, got %q", string(data))
}
// We only support version two of this protocol, so we have to check
// whether that version is announced by the client.
if bytes.Equal(data, []byte("version=2\n")) {
clientSupportsVersion2 = true
}
break
}
// We have gotten a flush packet, so the client is done announcing its
// versions. If we haven't seen our version announced then it's time to
// quit.
if !clientSupportsVersion2 {
return fmt.Errorf("client does not support version 2")
}
// Announce that we're a server and that we're talking version 2 of this
// protocol.
if _, err := pktline.WriteString(writer, "git-filter-server\n"); err != nil {
return fmt.Errorf("announcing server presence: %w", err)
}
if _, err := pktline.WriteString(writer, "version=2\n"); err != nil {
return fmt.Errorf("announcing server version: %w", err)
}
if err := pktline.WriteFlush(writer); err != nil {
return fmt.Errorf("flushing announcement: %w", err)
}
state = processStateCapabilities
case processStateCapabilities:
// Similar as above, the client will now announce all the capabilities it
// supports. We only support the "smudging" capability.
if !pktline.IsFlush(line) {
if !bytes.HasPrefix(data, []byte("capability=")) {
return fmt.Errorf("expected capability, got: %q", string(data))
}
// We only support smudging contents.
if bytes.Equal(data, []byte("capability=smudge\n")) {
clientSupportsSmudgeCapability = true
}
break
}
// If the client doesn't support smudging then we're done.
if !clientSupportsSmudgeCapability {
return fmt.Errorf("client does not support smudge capability")
}
// Announce that the only capability we support ourselves is smudging.
if _, err := pktline.WriteString(writer, "capability=smudge\n"); err != nil {
return fmt.Errorf("announcing smudge capability: %w", err)
}
if err := pktline.WriteFlush(writer); err != nil {
return fmt.Errorf("flushing capability announcement: %w", err)
}
state = processStateCommand
case processStateCommand:
// We're now in the processing loop where the client may announce one or
// more smudge commands.
if !bytes.Equal(data, []byte("command=smudge\n")) {
return fmt.Errorf("expected smudge command, got %q", string(data))
}
state = processStateSmudgeMetadata
case processStateSmudgeMetadata:
// The client sends us various information about the blob like the path
// name or treeish. We don't care about that information, so we just wait
// until we get the flush packet.
if !pktline.IsFlush(line) {
break
}
content.Reset()
state = processStateSmudgeContent
case processStateSmudgeContent:
// When we receive a flush packet we know that the client is done sending us
// the clean data.
if pktline.IsFlush(line) {
smudgedReader, err := smudgeOneObject(ctx, cfg, client, &content, logger)
if err != nil {
logger.WithError(err).Error("failed smudging LFS pointer")
if _, err := pktline.WriteString(writer, "status=error\n"); err != nil {
return fmt.Errorf("reporting failure: %w", err)
}
if err := pktline.WriteFlush(writer); err != nil {
return fmt.Errorf("flushing error: %w", err)
}
state = processStateCommand
break
}
defer smudgedReader.Close()
if _, err := pktline.WriteString(writer, "status=success\n"); err != nil {
return fmt.Errorf("sending status: %w", err)
}
if err := pktline.WriteFlush(writer); err != nil {
return fmt.Errorf("flushing status: %w", err)
}
// Read the smudged results in batches and relay it to the client.
// Because pktlines are limited in size we only ever read at most
// that many bytes.
var isEOF bool
for !isEOF {
n, err := smudgedReader.Read(buf)
if err != nil {
if !errors.Is(err, io.EOF) {
return fmt.Errorf("reading smudged contents: %w", err)
}
isEOF = true
}
if n > 0 {
if _, err := pktline.WriteString(writer, string(buf[:n])); err != nil {
return fmt.Errorf("writing smudged contents: %w", err)
}
}
}
smudgedReader.Close()
// We're done writing the smudged contents to the client, so we need
// to tell the client.
if err := pktline.WriteFlush(writer); err != nil {
return fmt.Errorf("flushing smudged contents: %w", err)
}
// We now have the opportunity to correct the status in case an
// error happened. For now we don't bother though and just abort the
// whole process in case we failed to read an LFS object, and that's
// why we just flush a second time.
if err := pktline.WriteFlush(writer); err != nil {
return fmt.Errorf("flushing final status: %w", err)
}
// We are now ready to accept another command.
state = processStateCommand
break
}
// Write the pktline into our buffer. Ideally, we could avoid slurping the
// whole content into memory first. But unfortunately, this is impossible in
// the context of long-running processes: the server-side _must not_ answer
// to the client before it has received all contents. And in the case we got
// a non-LFS-pointer as input, this means we have to slurp in all of its
// contents so that we can echo it back to the caller.
if _, err := content.Write(data); err != nil {
return fmt.Errorf("could not write clean data: %w", err)
}
}
if err := writer.Flush(); err != nil {
return fmt.Errorf("could not flush: %w", err)
}
}
if err := scanner.Err(); err != nil && !errors.Is(err, io.EOF) {
return fmt.Errorf("error scanning input: %w", err)
}
if state != processStateCommand {
return fmt.Errorf("unexpected termination in state %v", state)
}
return nil
}