statefun-sdk-go/v3/pkg/statefun/internal/cell.go (112 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package internal import ( "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol" "io" "log" ) const ( // smallBufferSize is an initial allocation minimal capacity. // this is the same constant as used in bytes.Buffer. smallBufferSize = 64 maxInt = int(^uint(0) >> 1) ) // Cell is a mutable, persisted value. // This struct is not thread safe. type Cell struct { buf []byte // contents are the bytes buf[off : len(buf)] off int // read at &buf[off], write at &buf[len(buf)] mutated bool // tracker if the cell has been mutated hasValue bool // tracker if the cell has a valid value typeTypeName string // the typename of the type whose serialized contents are stored in the cell } // NewCell creates and initializes a new Cell using state's value as its // initial contents. The new Cell takes ownership of state, and the // caller should not use state after this call. func NewCell(state *protocol.ToFunction_PersistedValue, typeTypeName string) *Cell { c := &Cell{ typeTypeName: typeTypeName, } if state.StateValue != nil && state.StateValue.HasValue { c.hasValue = true c.buf = state.StateValue.Value } return c } // SeekToBeginning resets the cell so the next // read starts from the beginning of the underlying // buffer, regardless of where the last read left off func (c *Cell) SeekToBeginning() { c.off = 0 } // Read reads up to len(p) bytes into p. It returns the number of bytes // read (0 <= n <= len(p)) and any error encountered. Read is resumable // and returns EOF when there are no more bytes to read. This behavior // is required for Cell to interoperate with the go standard library. // Users of Cell are required to call SeekToBeginning, before the first // read to ensure reads always begin at the start of the buffer. func (c *Cell) Read(p []byte) (n int, err error) { if c.empty() { if len(p) == 0 { return 0, nil } return 0, io.EOF } n = copy(p, c.buf[c.off:]) c.off += n return n, nil } // Reset resets the cell to be empty. // This method must be called when // setting a new value in storage // to ensure the new value overrides // the previous. func (c *Cell) Reset() { c.buf = c.buf[:0] c.off = 0 } // Write writes the given slice into the cell. func (c *Cell) Write(p []byte) (n int, err error) { c.mutated = true c.hasValue = true // If buffer is empty, reset to recover space. if len(c.buf) == 0 && c.off != 0 { c.Reset() } m, ok := c.tryReslice(len(p)) if !ok { m = c.grow(len(p)) } return copy(c.buf[m:], p), nil } // Delete marks the value to be deleted and resets the cell to be empty, func (c *Cell) Delete() { c.mutated = true c.hasValue = false c.Reset() } // HasValue returns true if the cell contains a valid value, // if the value is false, calls to Read will consume 0 bytes func (c *Cell) HasValue() bool { return c.hasValue } // GetStateMutation turns the final Cell into a FromFunction_PersistedValueMutation. // The new FromFunction_PersistedValueMutation takes ownership of the underlying // buffer and the cell should not be used after this function returns. func (c *Cell) GetStateMutation(name string) *protocol.FromFunction_PersistedValueMutation { if !c.mutated { return nil } mutation := &protocol.FromFunction_PersistedValueMutation{ MutationType: protocol.FromFunction_PersistedValueMutation_DELETE, StateName: name, } if c.hasValue { mutation.MutationType = protocol.FromFunction_PersistedValueMutation_MODIFY mutation.StateValue = &protocol.TypedValue{ Typename: c.typeTypeName, HasValue: true, Value: c.buf, } } return mutation } // empty reports whether the unread portion of the cells buffer is empty. func (c *Cell) empty() bool { return len(c.buf) <= c.off } // len returns the number of bytes of the unread portion of the cells buffer func (c *Cell) len() int { return len(c.buf) - c.off } // tryReslice tries to reslice the cell, so it can // fit n more bytes of data. It returns the index where bytes should // be written and whether it succeeded. func (c *Cell) tryReslice(n int) (int, bool) { if l := len(c.buf); n <= cap(c.buf)-l { c.buf = c.buf[:l+n] return l, true } return 0, false } // grow grows the cell to guarantee space for n more bytes. // It returns the index where bytes should be written. func (c *Cell) grow(n int) int { m := c.len() if c.buf == nil && n <= smallBufferSize { c.buf = make([]byte, n, smallBufferSize) return 0 } capacity := cap(c.buf) if n <= capacity/2-m { // We can slide things down instead of allocating a new // slice. We only need m+n <= capacity to slide, but // we instead let capacity get twice as large so we // don't spend all our time copying. copy(c.buf, c.buf[c.off:]) } else if capacity > maxInt-capacity-n { log.Panic("error: failed to allocate capacity for internal cell. required capacity is greater than maximum allocatable space") } else { buf := make([]byte, 2*capacity+n) copy(buf, c.buf[c.off:]) c.buf = buf } c.off = 0 c.buf = c.buf[:m+n] return m }