onData method
Implementation
void onData(List csv) {
// skip empty line
if (csv.isEmpty || (csv.length == 1 && csv[0] == '')) {
return;
}
// check error found in csv
if (csv.length > 1 && 'error' == csv[1] && 'reference' == csv[2]) {
parsingStateError = true;
return;
}
// throw error on next row
if (parsingStateError) {
var error = csv[1];
var reference = csv[2];
throw FluxQueryException(error, reference);
}
// check csv annotations
var token = csv[0];
if ((annotations.contains(token) && !startNewTable) ||
(responseMode == FluxResponseMode.onlyNames && table == null)) {
startNewTable = true;
table = FluxTableMetaData(tableIndex);
tableIndex++;
tableId = -1;
} else if (table == null) {
throw FluxCsvParserException(
'Unable to parse CSV response. FluxTable definition was not found.');
}
if (annotationDatatype == token) {
_addDataTypes(table!, csv);
} else if (annotationGroup == token) {
groups = csv;
} else if (annotationDefault == token) {
_addDefaultEmptyValues(table!, csv);
} else {
if (startNewTable) {
if (responseMode == FluxResponseMode.onlyNames &&
table!.columns.isEmpty) {
_addDataTypes(table!, csv.map((e) => 'string').toList());
groups = csv.map((e) => 'false').toList();
}
// parse column names
_addGroups(table!, groups);
_addColumnNamesAndTags(table!, csv);
startNewTable = false;
return;
}
// table separator
if (csv.length == 1) {
return;
}
var currentId = csv[2];
if (tableId == -1) {
tableId = currentId;
}
if (tableId != currentId) {
// create new table with previous column headers settings
var fluxColumns = table!.columns;
table = FluxTableMetaData(tableIndex);
table!.columns.addAll(fluxColumns);
tableIndex = tableIndex + 1;
tableId = currentId;
}
// create flux record from csv line
var fluxRecord =
table!.toObject(csv); // parseRecord(tableIndex - 1, table, csv);
_controller.add(fluxRecord);
}
}