in src/main/java/org/apache/sysds/runtime/iogen/template/MatrixGenerateReader.java [98:257]
private MatrixBlock computeSizeAndCreateOutputMatrixBlock(TextInputFormat informat, JobConf job, InputSplit[] splits,
long estnnz) throws IOException, DMLRuntimeException {
int row = 0;
// count rows in parallel per split
try {
if(_props.getRowIndexStructure().getProperties() == RowIndexStructure.IndexProperties.Identity) {
// compute number of rows
for(InputSplit inputSplit : splits) {
RecordReader<LongWritable, Text> reader = informat.getRecordReader(inputSplit, job, Reporter.NULL);
LongWritable key = new LongWritable();
Text value = new Text();
try {
// count remaining number of rows, ignore meta data
while(reader.next(key, value)) {
row++;
}
}
finally {
IOUtilFunctions.closeSilently(reader);
}
}
}
else if(_props.getRowIndexStructure().getProperties() == RowIndexStructure.IndexProperties.CellWiseExist ||
_props.getRowIndexStructure().getProperties() == RowIndexStructure.IndexProperties.RowWiseExist) {
int mxRow = 0;
int endPos;
for(InputSplit inputSplit : splits) {
RecordReader<LongWritable, Text> reader = informat.getRecordReader(inputSplit, job, Reporter.NULL);
LongWritable key = new LongWritable();
Text value = new Text();
try {
if(_props.getRowIndexStructure().getKeyPattern().size() == 1){
if(_props.getRowIndexStructure().getKeyPattern().get(0).length() == 0){
while(reader.next(key, value)) {
String strValue = value.toString();
endPos = TemplateUtil.getEndPos(strValue,strValue.length(),0,_props.getRowIndexStructure()
.endWithValueString());
int rowValue;
try {
rowValue = Integer.parseInt(strValue.substring(0, endPos));
}
catch(Exception ex){
rowValue = 0;
}
mxRow = Math.max(mxRow, rowValue);
}
}
}
else {
while(reader.next(key, value)) {
String strValue = value.toString();
int index = 0;
for(int i=0; i< _props.getRowIndexStructure().getKeyPattern().size() && index!=-1; i++){
index = strValue.indexOf(_props.getRowIndexStructure().getKeyPattern().get(i), index);
}
if(index!=-1){
endPos = TemplateUtil.getEndPos(strValue,strValue.length(),
_props.getRowIndexStructure().getKeyPattern().
get(_props.getRowIndexStructure().getKeyPattern().size() -1).length()+index,
_props.getRowIndexStructure()
.endWithValueString());
int rowValue;
try {
rowValue = Integer.parseInt(strValue.substring(0, endPos));
}
catch(Exception ex){
rowValue = 0;
}
mxRow = Math.max(mxRow, rowValue);
}
}
}
}
finally {
IOUtilFunctions.closeSilently(reader);
}
}
row = mxRow;
}
else if(_props.getRowIndexStructure().getProperties() == RowIndexStructure.IndexProperties.SeqScatter) {
_offsets = new TemplateUtil.SplitOffsetInfos(splits.length);
int splitIndex = 0;
for(InputSplit inputSplit : splits) {
int nrows = 0;
TemplateUtil.SplitInfo splitInfo = new TemplateUtil.SplitInfo();
ArrayList<Pair<Long, Integer>> beginIndexes =
TemplateUtil.getTokenIndexOnMultiLineRecords(inputSplit, informat, job,
_props.getRowIndexStructure().getSeqBeginString()).getKey();
ArrayList<Pair<Long, Integer>> endIndexes;
int tokenLength = 0;
boolean diffBeginEndToken = false;
if(!_props.getRowIndexStructure().getSeqBeginString().equals(_props.getRowIndexStructure().getSeqEndString())) {
endIndexes = TemplateUtil.getTokenIndexOnMultiLineRecords(inputSplit, informat, job,
_props.getRowIndexStructure().getSeqEndString()).getKey();
tokenLength = _props.getRowIndexStructure().getSeqEndString().length();
diffBeginEndToken = true;
}
else {
endIndexes = new ArrayList<>();
for(int i = 1; i < beginIndexes.size(); i++)
endIndexes.add(beginIndexes.get(i));
}
beginIndexes.remove(beginIndexes.size()-1);
int i = 0;
int j = 0;
if(beginIndexes.get(0).getKey() > endIndexes.get(0).getKey())
j++;
while(i < beginIndexes.size() && j < endIndexes.size()) {
Pair<Long, Integer> p1 = beginIndexes.get(i);
Pair<Long, Integer> p2 = endIndexes.get(j);
int n = 0;
while(p1.getKey() < p2.getKey() || (p1.getKey() == p2.getKey() && p1.getValue() < p2.getValue())) {
n++;
i++;
if(i == beginIndexes.size())
break;
p1 = beginIndexes.get(i);
}
j += n - 1;
splitInfo.addIndexAndPosition(beginIndexes.get(i - n).getKey(), endIndexes.get(j).getKey(),
beginIndexes.get(i - n).getValue(),endIndexes.get(j).getValue() + tokenLength);
j++;
nrows++;
}
if(!diffBeginEndToken && i == beginIndexes.size() && j < endIndexes.size())
nrows++;
if(beginIndexes.get(0).getKey() == 0 && beginIndexes.get(0).getValue() == 0)
splitInfo.setRemainString("");
else {
RecordReader<LongWritable, Text> reader = informat.getRecordReader(inputSplit, job, Reporter.NULL);
LongWritable key = new LongWritable();
Text value = new Text();
StringBuilder sb = new StringBuilder();
for(int ri = 0; ri < beginIndexes.get(0).getKey(); ri++) {
reader.next(key, value);
String raw = value.toString();
sb.append(raw);
}
if(beginIndexes.get(0).getValue() != 0) {
reader.next(key, value);
sb.append(value.toString().substring(0, beginIndexes.get(0).getValue()));
}
splitInfo.setRemainString(sb.toString());
}
splitInfo.setNrows(nrows);
_offsets.setSeqOffsetPerSplit(splitIndex, splitInfo);
_offsets.setOffsetPerSplit(splitIndex, row);
row += nrows;
splitIndex++;
}
}
}
catch(Exception e) {
throw new IOException("Thread pool Error " + e.getMessage(), e);
}
MatrixBlock ret = createOutputMatrixBlock(row, _props.getNcols(), row, estnnz, true, false);
return ret;
}