in linkis-engineconn-plugins/hbase/hbase-shims-1.2.0/src/main/resources/hbase-ruby/hbase/admin.rb [253:368]
def create(table_name, *args)
raise(ArgumentError, "Table name must be of type String") unless table_name.kind_of?(String)
args = args.flatten.compact
has_columns = false
htd = org.apache.hadoop.hbase.HTableDescriptor.new(org.apache.hadoop.hbase.TableName.valueOf(table_name))
splits = nil
args.each do |arg|
unless arg.kind_of?(String) || arg.kind_of?(Hash)
raise(ArgumentError, "#{arg.class} of #{arg.inspect} is not of Hash or String type")
end
if arg.kind_of?(String) or arg.has_key?(NAME)
descriptor = hcd(arg, htd);
if htd.hasFamily(descriptor.getName)
puts "Family '" + descriptor.getNameAsString() + "' already exists, the old one will be replaced"
htd.modifyFamily(descriptor)
else
htd.addFamily(descriptor)
end
has_columns = true
next
end
if arg.has_key?(REGION_REPLICATION)
region_replication = JInteger.valueOf(arg.delete(REGION_REPLICATION))
htd.setRegionReplication(region_replication)
end
if (method = arg.delete(METHOD))
raise(ArgumentError, "table_att is currently the only supported method") unless method == 'table_att'
end
if arg.has_key?(SPLITS_FILE)
splits_file = arg.delete(SPLITS_FILE)
unless File.exist?(splits_file)
raise(ArgumentError, "Splits file #{splits_file} doesn't exist")
end
arg[SPLITS] = []
File.foreach(splits_file) do |line|
arg[SPLITS].push(line.chomp)
end
htd.setValue(SPLITS_FILE, arg[SPLITS_FILE])
end
if arg.has_key?(SPLITS)
splits = Java::byte[][arg[SPLITS].size].new
idx = 0
arg.delete(SPLITS).each do |split|
splits[idx] = org.apache.hadoop.hbase.util.Bytes.toBytesBinary(split)
idx = idx + 1
end
elsif arg.has_key?(NUMREGIONS) or arg.has_key?(SPLITALGO)
raise(ArgumentError, "Number of regions must be specified") unless arg.has_key?(NUMREGIONS)
raise(ArgumentError, "Split algorithm must be specified") unless arg.has_key?(SPLITALGO)
raise(ArgumentError, "Number of regions must be greater than 1") unless arg[NUMREGIONS] > 1
num_regions = arg.delete(NUMREGIONS)
split_algo = RegionSplitter.newSplitAlgoInstance(@conf, arg.delete(SPLITALGO))
splits = split_algo.split(JInteger.valueOf(num_regions))
end
htd.setOwnerString(arg.delete(OWNER)) if arg[OWNER]
htd.setMaxFileSize(JLong.valueOf(arg.delete(MAX_FILESIZE))) if arg[MAX_FILESIZE]
htd.setReadOnly(JBoolean.valueOf(arg.delete(READONLY))) if arg[READONLY]
htd.setCompactionEnabled(JBoolean.valueOf(arg.delete(COMPACTION_ENABLED))) if arg[COMPACTION_ENABLED]
htd.setMemStoreFlushSize(JLong.valueOf(arg.delete(MEMSTORE_FLUSHSIZE))) if arg[MEMSTORE_FLUSHSIZE]
if arg.include?(DEFERRED_LOG_FLUSH)
if arg.delete(DEFERRED_LOG_FLUSH).to_s.upcase == "TRUE"
htd.setDurability(org.apache.hadoop.hbase.client.Durability.valueOf("ASYNC_WAL"))
else
htd.setDurability(org.apache.hadoop.hbase.client.Durability.valueOf("SYNC_WAL"))
end
end
htd.setDurability(org.apache.hadoop.hbase.client.Durability.valueOf(arg.delete(DURABILITY))) if arg[DURABILITY]
htd.setFlushPolicyClassName(arg.delete(FLUSH_POLICY)) if arg[FLUSH_POLICY]
htd.setRegionSplitPolicyClassName(arg.delete(SPLIT_POLICY)) if arg[SPLIT_POLICY]
htd.setRegionMemstoreReplication(JBoolean.valueOf(arg.delete(REGION_MEMSTORE_REPLICATION))) if arg[REGION_MEMSTORE_REPLICATION]
htd.setRegionReplication(JInteger.valueOf(arg.delete(REGION_REPLICATION))) if arg[REGION_REPLICATION]
parse_htd_args(htd, arg)
set_user_metadata(htd, arg.delete(METADATA)) if arg[METADATA]
set_descriptor_config(htd, arg.delete(CONFIGURATION)) if arg[CONFIGURATION]
arg.each_key do |ignored_key|
puts("An argument ignored (unknown or overridden): %s" % [ ignored_key ])
end
end
raise(ArgumentError, "Table must have at least one column family") if !has_columns
if splits.nil?
@admin.createTable(htd)
else
@admin.createTable(htd, splits)
end
end