client/pkg/multicast/publisher.go (48 lines of code) (raw):
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package multicast
import (
"fmt"
"github.com/GoogleCloudPlatform/datashare-toolkit/client/internal/pubsubutil"
log "github.com/sirupsen/logrus"
"strings"
)
// Publish sets the UDPConn max datagram size and writes packets received
// to a PubSub Topic. Returns error
func (c *Client) Publish() error {
if err := c.Conn.SetReadBuffer(c.ReadBufferBytes); err != nil {
return fmt.Errorf("c.Conn.SetReadBuffer: %s", err)
}
for {
buffer := make([]byte, maxDatagramSize)
numBytes, _, err := c.Conn.ReadFromUDP(buffer)
if err != nil {
return fmt.Errorf("c.Conn.ReadFromUDP: %s", err)
}
c.Counter.totalReceivedMessages++
// publish the messages in routine
raw := buffer[:numBytes]
// TODO add source as custom metadata
if res := pubsubutil.PublishMessage(c.Topic, raw); res == nil {
return fmt.Errorf("pubsubutil.PublishMessage did not have a response")
}
// TODO add Ready state in routine to check message id success
c.Counter.totalPublishedMessages++
}
return nil
}
// PublishS sets the UDPConn max datagram size and writes packets received
// as string to a PubSub Topic . Returns error
func (c *Client) PublishS() error {
if err := c.Conn.SetReadBuffer(c.ReadBufferBytes); err != nil {
return fmt.Errorf("c.Conn.SetReadBuffer: %s", err)
}
for {
buffer := make([]byte, maxDatagramSize)
numBytes, src, err := c.Conn.ReadFromUDP(buffer)
if err != nil {
return fmt.Errorf("c.Conn.ReadFromUDP: %s", err)
}
c.Counter.totalReceivedMessages++
log.Debugf("'%d' bytes read from '%s'", numBytes, src)
// publish the messages
raw := string(buffer[:numBytes])
raw = strings.TrimSuffix(raw, "\n")
// TODO add source as custom metadata
_, err = pubsubutil.PublishMessageS(c.Topic, raw)
if err != nil {
return fmt.Errorf("pubsubutil.PublishMessage: %s", err.Error())
}
c.Counter.totalPublishedMessages++
}
return nil
}