data/rand-many-types/generate.py (156 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import pyarrow as pa import numpy as np import string from decimal import Decimal from datetime import datetime, timedelta def generate_random_data(data_type, num_rows, random_generator): rng = random_generator if pa.types.is_int8(data_type): return pa.array(rng.integers(-128, 127, num_rows, dtype=np.int8)) elif pa.types.is_int16(data_type): return pa.array(rng.integers(-32768, 32767, num_rows, dtype=np.int16)) elif pa.types.is_int32(data_type): return pa.array( rng.integers(-2147483648, 2147483647, num_rows, dtype=np.int32) ) elif pa.types.is_int64(data_type): return pa.array( rng.integers( -9223372036854775808, 9223372036854775807, num_rows, dtype=np.int64, ) ) elif pa.types.is_uint8(data_type): return pa.array(rng.integers(0, 255, num_rows, dtype=np.uint8)) elif pa.types.is_uint16(data_type): return pa.array(rng.integers(0, 65535, num_rows, dtype=np.uint16)) elif pa.types.is_uint32(data_type): return pa.array(rng.integers(0, 4294967295, num_rows, dtype=np.uint32)) elif pa.types.is_uint64(data_type): return pa.array( rng.integers(0, 18446744073709551615, num_rows, dtype=np.uint64) ) elif pa.types.is_float32(data_type): return pa.array(rng.random(num_rows, np.float32)) elif pa.types.is_float64(data_type): return pa.array(rng.random(num_rows, np.float64)) elif pa.types.is_string(data_type): charset = list( string.ascii_lowercase + string.ascii_uppercase + string.digits ) return pa.array( ["".join(rng.choice(charset, 8)) for _ in range(num_rows)] ) elif pa.types.is_binary(data_type): return pa.array([rng.bytes(8) for _ in range(num_rows)]) elif pa.types.is_boolean(data_type): return pa.array(rng.choice([True, False], num_rows)) elif pa.types.is_date32(data_type): base_date = datetime(1970, 1, 1) return pa.array( [ (base_date + timedelta(days=int(rng.integers(0, 10000)))).date() for _ in range(num_rows) ], type=pa.date32(), ) elif pa.types.is_date64(data_type): base_date = datetime(1970, 1, 1) return pa.array( [ ( base_date + timedelta( milliseconds=int( rng.integers(0, 10000 * 24 * 60 * 60 * 1000) ) ) ).date() for _ in range(num_rows) ], type=pa.date64(), ) elif pa.types.is_timestamp(data_type): base_time = datetime(2016, 1, 1, 0, 0, 0, 0) return pa.array( [ base_time + timedelta(seconds=int(rng.integers(0, 10000))) for _ in range(num_rows) ], type=pa.timestamp("ns"), ) elif pa.types.is_decimal(data_type): return pa.array( [ Decimal( f"{rng.integers(10**7, 10**8-1)}.{rng.integers(0, 10**2-1)}" ) for _ in range(num_rows) ], type=pa.decimal128(10, 2), ) elif pa.types.is_list(data_type): return pa.array( [[rng.integers(0, 100) for _ in range(3)] for _ in range(num_rows)], type=pa.list_(pa.int32()), ) elif pa.types.is_struct(data_type): struct_type = pa.struct( [("field1", pa.int32()), ("field2", pa.float64())] ) return pa.array( [ {"field1": rng.integers(0, 100), "field2": rng.random()} for _ in range(num_rows) ], type=struct_type, ) elif pa.types.is_dictionary(data_type): return pa.array( [f"key_{i}" for i in range(num_rows)], type=pa.dictionary(pa.int32(), pa.string()), ) else: return pa.nulls(num_rows, type=data_type) data_types = [ pa.int8(), pa.int16(), pa.int32(), pa.int64(), pa.uint8(), pa.uint16(), pa.uint32(), pa.uint64(), pa.float32(), pa.float64(), pa.string(), pa.binary(), pa.bool_(), pa.date32(), pa.date64(), pa.timestamp("ns"), pa.decimal128(10, 2), pa.list_(pa.int32()), pa.struct([("field1", pa.int32()), ("field2", pa.float64())]), pa.dictionary(pa.int32(), pa.string()), pa.null(), ] schema = pa.schema( [(f"col_{j}", data_type) for j, data_type in enumerate(data_types)] ) num_rows_per_batch = 1000 num_batches = 100 random_seed = 12345 random_generator = np.random.default_rng(random_seed) path = "random.arrows" with pa.ipc.new_stream(path, schema) as writer: for i in range(0, num_batches): columns = { f"col_{j}": generate_random_data( data_type, num_rows_per_batch, random_generator ) for j, data_type in enumerate(data_types) } writer.write_batch(pa.RecordBatch.from_pydict(columns))