benchmarks/perf_get_records.py (69 lines of code) (raw):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import argparse
import time
from datahub import DataHub
from datahub.models import CursorType
class Timer(object):
def __init__(self, verbose=False):
self.verbose = verbose
def __enter__(self):
self.start = time.time()
return self
def __exit__(self, *args):
self.end = time.time()
self.secs = self.end - self.start
self.msecs = self.secs * 1000 # millisecs
if self.verbose:
print('elapsed time: %f ms' % self.msecs)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('access_id', help='account access id')
parser.add_argument('access_key', help='account access key')
parser.add_argument('endpoint', help='datahub server endpoint')
parser.add_argument('--batch', help='batch record num', type=int, default=100)
parser.add_argument('--round', help='round num', type=int, default=10000)
parser.add_argument('--project', help='project name', default='py_perf_test_project')
parser.add_argument('--topic', help='topic name', default='py_perf_test_topic')
parser.add_argument('--retry_times', help='request retry nums', type=int, default=3)
parser.add_argument('--conn_timeout', help='connect timeout', type=int, default=5)
parser.add_argument('--read_timeout', help='read timeout', type=int, default=120)
parser.add_argument('--stream', help='read timeout', action="store_true")
parser.add_argument('--protobuf', help='protobuf mode', type=bool, default=False)
args = parser.parse_args()
print("=============configuration=============")
print("access_id:%s" % args.access_id)
print("access_key:%s" % args.access_key)
print("endpoint:%s" % args.endpoint)
print("project:%s" % args.project)
print("topic:%s" % args.topic)
print("retry_times:%d" % args.retry_times)
print("conn_timeout:%d" % args.conn_timeout)
print("read_timeout:%d" % args.read_timeout)
print("batch record num:%d" % args.batch)
print("round num:%d" % args.round)
print("stream:%s" % args.stream)
print("protobuf:%s" % args.protobuf)
print("=======================================\n\n")
dh = DataHub(args.access_id, args.access_key, args.endpoint, retry_times=args.retry_times,
conn_timeout=args.conn_timeout, read_timeout=args.read_timeout)
# project = Project(name=args.project, comment='perf project for python sdk')
# dh.create_project(project)
# print "create project %s success!" % args.project
# print "=======================================\n\n"
topic_result = dh.get_topic(args.project, args.topic)
print("get topic %s success! detail:\n%s" % (args.topic, str(topic_result)))
print("=======================================\n\n")
cursor_result = dh.get_cursor(args.project, args.topic, '0', CursorType.OLDEST)
print("get topic %s oldest cursor success! detail:\n%s" % (args.topic, cursor_result.cursor))
print("=======================================\n\n")
read_request_count = 0
read_suc_reord_count = 0
cursor = cursor_result.cursor
with Timer() as t:
for i in range(0, args.round):
record_result = dh.get_tuple_records(topic_result.project_name, topic_result.topic_name, '0',
topic_result.record_schema, cursor, args.batch)
read_request_count += 1
read_suc_reord_count += record_result.record_count
if record_result.record_count == 0:
break
cursor = record_result.next_cursor
print("===============result==================")
print("read_request_count:%d, %f/s" % (read_request_count, (1000.0 * read_request_count) / t.msecs))
print("read_suc_reord_count:%d, %f/s" % (read_suc_reord_count, (1000.0 * read_suc_reord_count) / t.msecs))
print("=> elapsed time: %fs" % t.secs)