pulsaradmin/pkg/utils/data.go (472 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package utils // ClusterData information on a cluster type ClusterData struct { Name string `json:"-"` ServiceURL string `json:"serviceUrl"` ServiceURLTls string `json:"serviceUrlTls"` BrokerServiceURL string `json:"brokerServiceUrl"` BrokerServiceURLTls string `json:"brokerServiceUrlTls"` PeerClusterNames []string `json:"peerClusterNames"` AuthenticationPlugin string `json:"authenticationPlugin"` AuthenticationParameters string `json:"authenticationParameters"` BrokerClientTrustCertsFilePath string `json:"brokerClientTrustCertsFilePath"` BrokerClientTLSEnabled bool `json:"brokerClientTlsEnabled"` } // FunctionData information for a Pulsar Function type FunctionData struct { UpdateAuthData bool `json:"updateAuthData"` RetainOrdering bool `json:"retainOrdering"` Watch bool `json:"watch"` AutoAck bool `json:"autoAck"` Parallelism int `json:"parallelism"` WindowLengthCount int `json:"windowLengthCount"` SlidingIntervalCount int `json:"slidingIntervalCount"` MaxMessageRetries int `json:"maxMessageRetries"` TimeoutMs int64 `json:"timeoutMs"` SlidingIntervalDurationMs int64 `json:"slidingIntervalDurationMs"` WindowLengthDurationMs int64 `json:"windowLengthDurationMs"` RAM int64 `json:"ram"` Disk int64 `json:"disk"` CPU float64 `json:"cpu"` SubsName string `json:"subsName"` DeadLetterTopic string `json:"deadLetterTopic"` Key string `json:"key"` State string `json:"state"` TriggerValue string `json:"triggerValue"` TriggerFile string `json:"triggerFile"` Topic string `json:"topic"` UserCodeFile string `json:"-"` FQFN string `json:"fqfn"` Tenant string `json:"tenant"` Namespace string `json:"namespace"` FuncName string `json:"functionName"` InstanceID string `json:"instance_id"` ClassName string `json:"className"` FunctionType string `json:"functionType"` CleanupSubscription bool `json:"cleanupSubscription"` Jar string `json:"jarFile"` Py string `json:"pyFile"` Go string `json:"goFile"` Inputs string `json:"inputs"` TopicsPattern string `json:"topicsPattern"` Output string `json:"output"` ProducerConfig string `json:"producerConfig"` LogTopic string `json:"logTopic"` SchemaType string `json:"schemaType"` CustomSerDeInputs string `json:"customSerdeInputString"` CustomSchemaInput string `json:"customSchemaInputString"` CustomSchemaOutput string `json:"customSchemaOutputString"` InputSpecs string `json:"inputSpecs"` InputTypeClassName string `json:"inputTypeClassName"` OutputSerDeClassName string `json:"outputSerdeClassName"` OutputTypeClassName string `json:"outputTypeClassName"` FunctionConfigFile string `json:"fnConfigFile"` ProcessingGuarantees string `json:"processingGuarantees"` UserConfig string `json:"userConfigString"` RetainKeyOrdering bool `json:"retainKeyOrdering"` BatchBuilder string `json:"batchBuilder"` ForwardSourceMessageProperty bool `json:"forwardSourceMessageProperty"` SubsPosition string `json:"subsPosition"` SkipToLatest bool `json:"skipToLatest"` CustomRuntimeOptions string `json:"customRuntimeOptions"` Secrets string `json:"secretsString"` DestinationFile string `json:"destinationFile"` Path string `json:"path"` RuntimeFlags string `json:"runtimeFlags,omitempty"` FuncConf *FunctionConfig `json:"-"` } // Failure Domain information type FailureDomainData struct { ClusterName string `json:"-"` DomainName string `json:"-"` BrokerList []string `json:"brokers"` } type FailureDomainMap map[string]FailureDomainData // Tenant args type TenantData struct { Name string `json:"-"` AdminRoles []string `json:"adminRoles"` AllowedClusters []string `json:"allowedClusters"` } type SourceData struct { Tenant string `json:"tenant,omitempty"` Namespace string `json:"namespace,omitempty"` Name string `json:"name,omitempty"` SourceType string `json:"sourceType,omitempty"` ProcessingGuarantees string `json:"processingGuarantees,omitempty"` DestinationTopicName string `json:"destinationTopicName,omitempty"` ProducerConfig string `json:"producerConfig,omitempty"` BatchBuilder string `json:"batchBuilder,omitempty"` DeserializationClassName string `json:"deserializationClassName,omitempty"` SchemaType string `json:"schemaType,omitempty"` Parallelism int `json:"parallelism,omitempty"` Archive string `json:"archive,omitempty"` ClassName string `json:"className,omitempty"` SourceConfigFile string `json:"sourceConfigFile,omitempty"` CPU float64 `json:"cpu,omitempty"` RAM int64 `json:"ram,omitempty"` Disk int64 `json:"disk,omitempty"` SourceConfigString string `json:"sourceConfigString,omitempty"` BatchSourceConfigString string `json:"batchSourceConfigString,omitempty"` CustomRuntimeOptions string `json:"customRuntimeOptions,omitempty"` Secrets string `json:"secretsString,omitempty"` SourceConf *SourceConfig `json:"-,omitempty"` InstanceID string `json:"instanceId,omitempty"` UpdateAuthData bool `json:"updateAuthData,omitempty"` RuntimeFlags string `json:"runtimeFlags,omitempty"` } type SinkData struct { UpdateAuthData bool `json:"updateAuthData,omitempty"` RetainOrdering bool `json:"retainOrdering,omitempty"` AutoAck bool `json:"autoAck,omitempty"` Parallelism int `json:"parallelism,omitempty"` RAM int64 `json:"ram,omitempty"` Disk int64 `json:"disk,omitempty"` TimeoutMs int64 `json:"timeoutMs,omitempty"` CPU float64 `json:"cpu,omitempty"` Tenant string `json:"tenant,omitempty"` Namespace string `json:"namespace,omitempty"` Name string `json:"name,omitempty"` SinkType string `json:"sinkType,omitempty"` CleanupSubscription bool `json:"cleanupSubscription"` Inputs string `json:"inputs,omitempty"` TopicsPattern string `json:"topicsPattern,omitempty"` SubsName string `json:"subsName,omitempty"` SubsPosition string `json:"subsPosition,omitempty"` CustomSerdeInputString string `json:"customSerdeInputString,omitempty"` CustomSchemaInputString string `json:"customSchemaInputString,omitempty"` InputSpecs string `json:"inputSpecs,omitempty"` MaxMessageRetries int `json:"maxMessageRetries,omitempty"` DeadLetterTopic string `json:"deadLetterTopic,omitempty"` ProcessingGuarantees string `json:"processingGuarantees,omitempty"` RetainKeyOrdering bool `json:"retainKeyOrdering,omitempty"` Archive string `json:"archive,omitempty"` ClassName string `json:"className,omitempty"` SinkConfigFile string `json:"sinkConfigFile,omitempty"` SinkConfigString string `json:"sinkConfigString,omitempty"` NegativeAckRedeliveryDelayMs int64 `json:"negativeAckRedeliveryDelayMs,omitempty"` CustomRuntimeOptions string `json:"customRuntimeOptions,omitempty"` Secrets string `json:"secretsString,omitempty"` InstanceID string `json:"instanceId,omitempty"` TransformFunction string `json:"transformFunction,omitempty"` TransformFunctionClassName string `json:"transformFunctionClassName,omitempty"` TransformFunctionConfig string `json:"transformFunctionConfig,omitempty"` RuntimeFlags string `json:"runtimeFlags,omitempty"` SinkConf *SinkConfig `json:"-,omitempty"` } // Topic data type PartitionedTopicMetadata struct { Partitions int `json:"partitions"` } type ManagedLedgerInfoLedgerInfo struct { LedgerID int64 `json:"ledgerId"` Entries int64 `json:"entries"` Size int64 `json:"size"` Timestamp int64 `json:"timestamp"` Offloaded bool `json:"isOffloaded"` OffloadedContextUUID string `json:"offloadedContextUuid"` } type ManagedLedgerInfo struct { Version int `json:"version"` CreationDate string `json:"creationDate"` ModificationData string `json:"modificationData"` Ledgers []ManagedLedgerInfoLedgerInfo `json:"ledgers"` TerminatedPosition PositionInfo `json:"terminatedPosition"` Cursors map[string]CursorInfo `json:"cursors"` } type NamespacesData struct { Enable bool `json:"enable"` Unload bool `json:"unload"` NumBundles int `json:"numBundles"` BookkeeperEnsemble int `json:"bookkeeperEnsemble"` BookkeeperWriteQuorum int `json:"bookkeeperWriteQuorum"` MessageTTL int `json:"messageTTL"` BookkeeperAckQuorum int `json:"bookkeeperAckQuorum"` ManagedLedgerMaxMarkDeleteRate float64 `json:"managedLedgerMaxMarkDeleteRate"` ClusterIDs string `json:"clusterIds"` RetentionTimeStr string `json:"retentionTimeStr"` LimitStr string `json:"limitStr"` LimitTime int64 `json:"limitTime"` PolicyStr string `json:"policyStr"` BacklogQuotaType string `json:"backlogQuotaType"` AntiAffinityGroup string `json:"antiAffinityGroup"` Tenant string `json:"tenant"` Cluster string `json:"cluster"` Bundle string `json:"bundle"` Clusters []string `json:"clusters"` } type TopicStats struct { BacklogSize int64 `json:"backlogSize"` MsgCounterIn int64 `json:"msgInCounter"` MsgCounterOut int64 `json:"msgOutCounter"` MsgRateIn float64 `json:"msgRateIn"` MsgRateOut float64 `json:"msgRateOut"` MsgThroughputIn float64 `json:"msgThroughputIn"` MsgThroughputOut float64 `json:"msgThroughputOut"` AverageMsgSize float64 `json:"averageMsgSize"` StorageSize int64 `json:"storageSize"` Publishers []PublisherStats `json:"publishers"` Subscriptions map[string]SubscriptionStats `json:"subscriptions"` Replication map[string]ReplicatorStats `json:"replication"` DeDuplicationStatus string `json:"deduplicationStatus"` } type ProducerAccessMode string const ( ProduceModeShared ProducerAccessMode = "Shared" ProduceModeExclusive = "Exclusive" ProduceModeExclusiveWithFencing = "ExclusiveWithFencing" ProduceModeWaitForExclusive = "WaitForExclusive" ) type PublisherStats struct { AccessModel ProducerAccessMode `json:"accessMode"` ProducerID int64 `json:"producerId"` MsgRateIn float64 `json:"msgRateIn"` MsgThroughputIn float64 `json:"msgThroughputIn"` AverageMsgSize float64 `json:"averageMsgSize"` ChunkedMessageRate float64 `json:"chunkedMessageRate"` IsSupportsPartialProducer bool `json:"supportsPartialProducer"` ProducerName string `json:"producerName"` Address string `json:"address"` ConnectedSince string `json:"connectedSince"` ClientVersion string `json:"clientVersion"` Metadata map[string]string `json:"metadata"` } type SubscriptionStats struct { BlockedSubscriptionOnUnackedMsgs bool `json:"blockedSubscriptionOnUnackedMsgs"` IsReplicated bool `json:"isReplicated"` LastConsumedFlowTimestamp int64 `json:"lastConsumedFlowTimestamp"` LastConsumedTimestamp int64 `json:"lastConsumedTimestamp"` LastAckedTimestamp int64 `json:"lastAckedTimestamp"` MsgRateOut float64 `json:"msgRateOut"` MsgThroughputOut float64 `json:"msgThroughputOut"` MsgRateRedeliver float64 `json:"msgRateRedeliver"` MsgRateExpired float64 `json:"msgRateExpired"` MsgBacklog int64 `json:"msgBacklog"` MsgBacklogNoDelayed int64 `json:"msgBacklogNoDelayed"` MsgDelayed int64 `json:"msgDelayed"` UnAckedMessages int64 `json:"unackedMessages"` SubType string `json:"type"` ActiveConsumerName string `json:"activeConsumerName"` BytesOutCounter int64 `json:"bytesOutCounter"` MsgOutCounter int64 `json:"msgOutCounter"` MessageAckRate float64 `json:"messageAckRate"` ChunkedMessageRate float64 `json:"chunkedMessageRate"` BacklogSize int64 `json:"backlogSize"` EarliestMsgPublishTimeInBacklog int64 `json:"earliestMsgPublishTimeInBacklog"` TotalMsgExpired int64 `json:"totalMsgExpired"` LastExpireTimestamp int64 `json:"lastExpireTimestamp"` LastMarkDeleteAdvancedTimestamp int64 `json:"lastMarkDeleteAdvancedTimestamp"` Consumers []ConsumerStats `json:"consumers"` IsDurable bool `json:"isDurable"` AllowOutOfOrderDelivery bool `json:"allowOutOfOrderDelivery"` ConsumersAfterMarkDeletePosition map[string]string `json:"consumersAfterMarkDeletePosition"` NonContiguousDeletedMessagesRanges int `json:"nonContiguousDeletedMessagesRanges"` NonContiguousDeletedMessagesRangesSrzSize int `json:"nonContiguousDeletedMessagesRangesSerializedSize"` DelayedMessageIndexSizeInBytes int64 `json:"delayedMessageIndexSizeInBytes"` SubscriptionProperties map[string]string `json:"subscriptionProperties"` FilterProcessedMsgCount int64 `json:"filterProcessedMsgCount"` FilterAcceptedMsgCount int64 `json:"filterAcceptedMsgCount"` FilterRejectedMsgCount int64 `json:"filterRejectedMsgCount"` FilterRescheduledMsgCount int64 `json:"filterRescheduledMsgCount"` } type ConsumerStats struct { BlockedConsumerOnUnAckedMsgs bool `json:"blockedConsumerOnUnackedMsgs"` AvailablePermits int `json:"availablePermits"` UnAckedMessages int `json:"unackedMessages"` MsgRateOut float64 `json:"msgRateOut"` MsgThroughputOut float64 `json:"msgThroughputOut"` MsgRateRedeliver float64 `json:"msgRateRedeliver"` ConsumerName string `json:"consumerName"` BytesOutCounter int64 `json:"bytesOutCounter"` MsgOutCounter int64 `json:"msgOutCounter"` MessageAckRate float64 `json:"messageAckRate"` ChunkedMessageRate float64 `json:"chunkedMessageRate"` AvgMessagesPerEntry int `json:"avgMessagesPerEntry"` Address string `json:"address"` ConnectedSince string `json:"connectedSince"` ClientVersion string `json:"clientVersion"` LastAckedTimestamp int64 `json:"lastAckedTimestamp"` LastConsumedTimestamp int64 `json:"lastConsumedTimestamp"` LastConsumedFlowTimestamp int64 `json:"lastConsumedFlowTimestamp"` Metadata map[string]string `json:"metadata"` } type ReplicatorStats struct { Connected bool `json:"connected"` MsgRateIn float64 `json:"msgRateIn"` MsgRateOut float64 `json:"msgRateOut"` MsgThroughputIn float64 `json:"msgThroughputIn"` MsgThroughputOut float64 `json:"msgThroughputOut"` MsgRateExpired float64 `json:"msgRateExpired"` ReplicationBacklog int64 `json:"replicationBacklog"` ReplicationDelayInSeconds int64 `json:"replicationDelayInSeconds"` InboundConnection string `json:"inboundConnection"` InboundConnectedSince string `json:"inboundConnectedSince"` OutboundConnection string `json:"outboundConnection"` OutboundConnectedSince string `json:"outboundConnectedSince"` } type PersistentTopicInternalStats struct { WaitingCursorsCount int `json:"waitingCursorsCount"` PendingAddEntriesCount int `json:"pendingAddEntriesCount"` EntriesAddedCounter int64 `json:"entriesAddedCounter"` NumberOfEntries int64 `json:"numberOfEntries"` TotalSize int64 `json:"totalSize"` CurrentLedgerEntries int64 `json:"currentLedgerEntries"` CurrentLedgerSize int64 `json:"currentLedgerSize"` LastLedgerCreatedTimestamp string `json:"lastLedgerCreatedTimestamp"` LastLedgerCreationFailureTimestamp string `json:"lastLedgerCreationFailureTimestamp"` LastConfirmedEntry string `json:"lastConfirmedEntry"` State string `json:"state"` Ledgers []LedgerInfo `json:"ledgers"` Cursors map[string]CursorStats `json:"cursors"` SchemaLedgers []SchemaLedger `json:"schemaLedgers"` CompactedLedger CompactedLedger `json:"compactedLedger"` } type LedgerInfo struct { LedgerID int64 `json:"ledgerId"` Entries int64 `json:"entries"` Size int64 `json:"size"` Timestamp int64 `json:"timestamp"` Offloaded bool `json:"offloaded"` MetaData string `json:"metadata"` UnderReplicated bool `json:"underReplicated"` } type CursorInfo struct { Version int `json:"version"` CreationDate string `json:"creationDate"` ModificationDate string `json:"modificationDate"` CursorsLedgerID int64 `json:"cursorsLedgerId"` MarkDelete PositionInfo `json:"markDelete"` IndividualDeletedMessages []MessageRangeInfo `json:"individualDeletedMessages"` Properties map[string]int64 } type PositionInfo struct { LedgerID int64 `json:"ledgerId"` EntryID int64 `json:"entryId"` } type MessageRangeInfo struct { From PositionInfo `json:"from"` To PositionInfo `json:"to"` Offloaded bool `json:"offloaded"` } type CursorStats struct { MarkDeletePosition string `json:"markDeletePosition"` ReadPosition string `json:"readPosition"` WaitingReadOp bool `json:"waitingReadOp"` PendingReadOps int `json:"pendingReadOps"` MessagesConsumedCounter int64 `json:"messagesConsumedCounter"` CursorLedger int64 `json:"cursorLedger"` CursorLedgerLastEntry int64 `json:"cursorLedgerLastEntry"` IndividuallyDeletedMessages string `json:"individuallyDeletedMessages"` LastLedgerWitchTimestamp string `json:"lastLedgerWitchTimestamp"` State string `json:"state"` NumberOfEntriesSinceFirstNotAckedMessage int64 `json:"numberOfEntriesSinceFirstNotAckedMessage"` TotalNonContiguousDeletedMessagesRange int `json:"totalNonContiguousDeletedMessagesRange"` Properties map[string]int64 `json:"properties"` } type PartitionedTopicStats struct { MsgRateIn float64 `json:"msgRateIn"` MsgRateOut float64 `json:"msgRateOut"` MsgThroughputIn float64 `json:"msgThroughputIn"` MsgThroughputOut float64 `json:"msgThroughputOut"` AverageMsgSize float64 `json:"averageMsgSize"` StorageSize int64 `json:"storageSize"` Publishers []PublisherStats `json:"publishers"` Subscriptions map[string]SubscriptionStats `json:"subscriptions"` Replication map[string]ReplicatorStats `json:"replication"` DeDuplicationStatus string `json:"deduplicationStatus"` Metadata PartitionedTopicMetadata `json:"metadata"` Partitions map[string]TopicStats `json:"partitions"` } type SchemaData struct { Version int64 `json:"version"` Filename string `json:"filename"` Jar string `json:"jar"` Type string `json:"type"` Classname string `json:"classname"` AlwaysAllowNull bool `json:"alwaysAllowNull"` DryRun bool `json:"dryRun"` } type LookupData struct { BrokerURL string `json:"brokerUrl"` BrokerURLTLS string `json:"brokerUrlTls"` HTTPURL string `json:"httpUrl"` HTTPURLTLS string `json:"httpUrlTls"` } type NsIsolationPoliciesData struct { Namespaces []string `json:"namespaces"` Primary []string `json:"primary"` Secondary []string `json:"secondary"` AutoFailoverPolicyTypeName string `json:"autoFailoverPolicyTypeName"` AutoFailoverPolicyParams string `json:"autoFailoverPolicyParams"` } type BrokerData struct { URL string `json:"brokerUrl"` ConfigName string `json:"configName"` ConfigValue string `json:"configValue"` } type BrokerStatsData struct { Indent bool `json:"indent"` } type ResourceQuotaData struct { Names string `json:"names"` Bundle string `json:"bundle"` MsgRateIn int64 `json:"msgRateIn"` MsgRateOut int64 `json:"msgRateOut"` BandwidthIn int64 `json:"bandwidthIn"` BandwidthOut int64 `json:"bandwidthOut"` Memory int64 `json:"memory"` Dynamic bool `json:"dynamic"` } type PersistenceData struct { BookkeeperEnsemble int64 `json:"bookkeeperEnsemble"` BookkeeperWriteQuorum int64 `json:"bookkeeperWriteQuorum"` BookkeeperAckQuorum int64 `json:"bookkeeperAckQuorum"` ManagedLedgerMaxMarkDeleteRate float64 `json:"managedLedgerMaxMarkDeleteRate"` } type DelayedDeliveryCmdData struct { Enable bool `json:"enable"` Disable bool `json:"disable"` DelayedDeliveryTimeStr string `json:"delayedDeliveryTimeStr"` } type DelayedDeliveryData struct { TickTime float64 `json:"tickTime"` Active bool `json:"active"` } type DispatchRateData struct { DispatchThrottlingRateInMsg int64 `json:"dispatchThrottlingRateInMsg"` DispatchThrottlingRateInByte int64 `json:"dispatchThrottlingRateInByte"` RatePeriodInSecond int64 `json:"ratePeriodInSecond"` RelativeToPublishRate bool `json:"relativeToPublishRate"` } type PublishRateData struct { PublishThrottlingRateInMsg int64 `json:"publishThrottlingRateInMsg"` PublishThrottlingRateInByte int64 `json:"publishThrottlingRateInByte"` } type SchemaLedger struct { LedgerID int64 `json:"ledgerId"` Entries int64 `json:"entries"` Size int64 `json:"size"` Timestamp int64 `json:"timestamp"` IsOffloaded bool `json:"isOffloaded"` } type CompactedLedger struct { LedgerID int64 `json:"ledgerId"` Entries int64 `json:"entries"` Size int64 `json:"size"` Offloaded bool `json:"offloaded"` UnderReplicated bool `json:"underReplicated"` } type GetStatsOptions struct { GetPreciseBacklog bool `json:"get_precise_backlog"` SubscriptionBacklogSize bool `json:"subscription_backlog_size"` GetEarliestTimeInBacklog bool `json:"get_earliest_time_in_backlog"` ExcludePublishers bool `json:"exclude_publishers"` ExcludeConsumers bool `json:"exclude_consumers"` } type BrokerInfo struct { BrokerID string `json:"brokerId"` ServiceURL string `json:"serviceUrl"` } type TopicVersion string const ( TopicVersionV1 TopicVersion = "V1" TopicVersionV2 TopicVersion = "V2" ) func (t TopicVersion) String() string { return string(t) }