jsonstreamreader 1.0.4 jsonstreamreader: ^1.0.4 copied to clipboard
This project processes local and remote json files/sources by splitting it into smaller chunks, with automatic garbage collecting.
Json Stream Reader #
Latest Features #
RemoteSource, this allows you to process online data.
Breaking Changes #
- pipe removed
- major fixes for JsonPath
- Trailing modified for a stream {"a":1,"b":2} instead of returning two items {"a":1} and {"b":2},{"a":1,"b":2} is returned
- [a:b] now matches as a range where index is >=a and <=b use [0-9]+ to acheive previous effects
- $.items does not match $.items.*
A Flutter Json Stream Reader
This project processes a local json file by splitting it into smaller chunks, with automatic garbage collecting.
Classes #
Streamer #
This class defines the size of each chunk,optionally values include
-
chunksize
default is 100
-
start
default is 0 and defines where Reader should start parsing file
-
end
default is the end of the file and defines where Reader should stop Reading file
Readers #
All readers have optional values which include
- int delay
how long the reader should wait before reading next chunk in microseconds
-
int combine
If combine is 1, array only values will be combined ie. [1,2,3] will result in {"0":1,"1":2,"2":3}, object values in an array are not affected ie. [{},{}] which mean [1,2,{"third":3}] will result in {"0":1,"1":2,"2":3, {"third":3} }
If combine is 2, array values will be collasped into an object closest to the root eg. {'first':{'second':{ "third":3 } } } will be collaped to {'first':{'/second/third/':3 } }
-
Reader
Provides access to json data by using function callbacks.
Methods include
- progess(Function func(double progress))
- fail(Function func(dynamic err))
- done(Function func)
- trailing(Future func(dynamic value, String key))
- filter(String expression, Future<dynamic> func(dynamic value, String key))
-
StreamReader
Provides access to json data by using StreamController
Streams include
- double progess
- dynamic fail
- Null done
- <StreamItem> pipe
- <StreamItem> trailing
- <StreamItem> filter
-
StreamItem
dynamic value
String key
Sources #
A streamer can accept either a File(dart.io) or RemoteSource From this package.
- File
File file = new File(path.path + '/citylots.json');
Streamer s = new Streamer(file);
- RemoteSource, a remote source does not have to be a remote file, it can also be a url with a json response. NB.Setting content-length in the header will provide accurate progress values.
RemoteSource source = new RemoteSource('https://jsonplaceholder.typicode.com/posts');
Streamer s = new Streamer(source);
source.download();
or
RemoteSource source = new RemoteSource('https://raw.githubusercontent.com/zemirco/sf-city-lots-json/master/citylots.json');
Streamer s = new Streamer(source);
source.download();
NB. RemoteSource does not work with Content-disposition: attachment
The only difference is that you must call source.download()
; preferrable after creating Streamer. The package sends header Accept-Encoding: identity
to determine the size of response without downloading the file.
Other useful functions include:
- Future<int> length()
- Future<FileSystemEntity> delete()
- void setBody(Map<String, String> body)
- void addHeader(String property, String value)
How has Trailing and Filter been changed #
{
"a":1,
"b":2,
"c":3
}
The above would normally yield three values
- {"a":1}
- {"b":2}
- {"c":3}
This will now result in
{"a":1,"b":2,"c":3}
Getting Started #
- Create file and streamer instance
File file = new File(path.path + '/citylots.json');
Streamer s = new Streamer(file);
/* or Streamer streamer = new Streamer(file,chunksize=200); */
default chunksize is 100(100kb).
- Create Reader instance
Reader reader = new Reader(streamer);
//or Reader r = new Reader(s, delay: 100); delay is in microseconds
- Register a function to recieve data, your options are
- filter(String expression,void (dynamic value, String key)) - value is either an JsonObject or a String
- trailing(void (dynamic value, String key))
- Example
getApplicationDocumentsDirectory().then((Directory path) {
File file = new File(path.path + '/citylots.json');
Streamer streamer = new Streamer(file);
Reader r = new Reader(streamer);
r.filter("\$.*", (dynamic value, String key) {
//items are received here
print(value);
})
.done(() {//called when all items have been processed
print("Completed");
}).fail((err) {//called when and if processing fails
print(err); });
}).catchError((error) {
print(error);
});
How are keys Represented? #
Root is represented as /
- Example 1
{ "glossary": { "title": "example glossary", "GlossDiv": { "title": "S", "GlossList": { "GlossEntry": { "ID": "SGML", "SortAs": "SGML", "GlossTerm": "Standard Generalized Markup Language", "Acronym": "SGML", "Abbrev": "ISO 8879:1986", "GlossDef": { "para": "A meta-markup language, used to create markup languages such as DocBook.", "GlossSeeAlso": ["GML", "XML"] }, "GlossSee": "markup" } } } } }
- object['glossary'] has a key of /
- object['glossary']['title'] has a key of /glossary/
- object['glossary']['GlossDiv'] has a key of /glossary/
- object['glossary']['GlossDiv']['title'] has a key of /glossary/title/
- object['glossary']['GlossDiv']['GlossList']['GlossEntry'] has a key of /glossary/GlossDiv/GlossList
So from Root(/) key is Root + key/
- Example 2
[ {"age":12}, 2, 3, 4 ]
- object[0]['age'] has a key of /0/age/
- object[1] has a key of /1/
Using Expressions in filter #
reader.filter(Expression, (dynamic value, String key) {
//items are received here
print(value);
})
- If you start your expression with "/", the normal regular expression rules apply so for object[0]['age'] I can use
"\/0\/age\/"
. You have to escape "/".reader.filter(
"\/0\/age\/"
, (dynamic value, String key) {//items are received here
print(value);
})
- For all children of object['glossary']['GlossDiv'] I can use
"\/glossary\/GlossDiv\/.*"
.reader.filter(
"\/glossary\/GlossDiv\/.*"
, (dynamic value, String key) {//items are received here
print(value);
})
Using JsonPath "Hack" in filter #
Expression | Path |
---|---|
$. | ^\/$ |
[0:9] | [0>= <=9]\/$ |
[:9]+ | [0>= <=9]+\/$ |
.. | \/.*\/.*\/$ |
... | \/.*\/.*\/.*\/$ |
$.* | /.* & / |
.* | .* |
[*] | .* |
{name} | any value with key "name" |
Operators #
Symbol | Meaning |
---|---|
& | AND |
| | OR |
! | NOT |
Operations can be used with path or jsonpath combinations. example:
-
"{name} & /items/"
all direct descendants of items with at least one key name
-
"{name} & $.items"
all direct descendants of items with at least one key name
-
"{name} & !$.items"
all direct descendants from root except for items with at least one key name
-
"{name} | !/"
all objects with at least one key name that is not a direct descendant of root
-
"({name} | !/) & $.items.*"
all objects with at least one key name or not a direct descendant of root and that is a descendant of items
The order does not matter(Operations was improved using https://github.com/riichard/boolean-parser-js)
- so for object[0]['age'] I can use
"$.0.age"
which is equivalent to"^\/0\/age\/$"
.reader.filter(
"$.0.age"
, (dynamic value, String key) {//items are received here print(value);
})
- For all children of object['glossary']['GlossDiv'] I can use
$.glossary.GlossDiv.*
which is equivalent ot"\/glossary\/GlossDiv\/.*"
.reader.filter(
"$.glossary.GlossDiv.*"
, (dynamic value, String key) {//items are received here
print(value);
})
Future #
After doing some benchmarking I found out these function are very expensive, so I added futures to the mix, now these futures are optional and must not be null.
How To use Futures? #
reader.filter(
"$.glossary.GlossDiv.*"
, (dynamic value, String key) {//items are received here
Completer<String> future = new Completer<String>();
return future.future;
})
Futures are always processed before the next function is called which makes for some very interesting. How so well I can do this.
- Say I have a function that returns a future after adding a list of items to a database, I would only add every ten items.
addToDatabase(List<Map<String,String>> items){
//implement here
//dont forget to empty list
items.clear();
}
With that out of the way now I will need to generate the list. I will maintain a list of items.
List<Map<String,String>> items = List<Map<String,String>>();
reader.filter(
"$.glossary.GlossDiv.*"
, (dynamic value, String key) { if(value.contains('lastkey')){if(items.keys.last.constains('lastkey')){
//new item items.add(value);
}
else{
//new item items.last.addAll(value);
}
}
else{
if(items.keys.last.constains('lastkey')){
//new item items.add(value);
}
else{
//new item items.last.addAll(value);
}
} /// now we add the items to database if there are 100 or more of them
if(items.length>99){
//add to database return future so that this is processed before we start adding
//more items
return addToDatabase(items);
}
})
.done((){
if(items.length>0){
addToDatabase(items);
}
});
What order are futures executed? #
- trailing
- filter
More about Streamer #
Streamer now has two more optional values: start and end. These indexes indicate where Streamer should start and stop reading the file.NB indexes start from zero.
Multiple Streamers & Readers #
It is also possible to attach multiple streamers and readers to multiple or the same file(s).
Streamer s = new Streamer(file, chunksize: 100 /*, start: 8, end: 612007*/);
Reader r = new Reader(s, delay: 100);
r.filter('\$.*', (dynamic value, String key) {
print('first');
value = null;
key = null;
//print("value is ${value} and key is ${key} of stream 1");
}).filter('\$.*', (dynamic value, String key) {
print('second');
value = null;
key = null;
//print("value is ${value} and key is ${key} of stream 1");
}).done(() {
print('executed in ${stopwatch.elapsed}');
}).progress((double progress) {
print('progress is ${progress}');
}).fail((err) {
print(err);
});
Streamer s2 =
new Streamer(file, chunksize: 100, start: 612016, end: 1224015);
Reader r2 = new Reader(s2, delay: 0);
r2.filter('\$.*', (dynamic value, String key) {
if (items2.length < 100) {
items2.add(value);
} else if(!close2) {
close2 = true;
c2.add(items2);
}
value = null;
key = null;
//print("value is ${value} and key is ${key} of stream 2");
}).done(() {
print('executed in ${stopwatch2.elapsed}');
}).progress((double progress) {
print('progress is ${progress}');
}).fail((err) {
print("error 2 is ${err}");
print(err.stackTrace);
});
StreamReader #
StreamReader is a Reader implemented using StreamController. One of the major differences is that StreamReader does not have a pipe stream.
Streams include:
- trailing
- done as a broadcast stream
- fail as a broadcast stream
- progress as a broadcast stream
- filter(String Expression)
File file = new File(path.path + '/citylots.json');
Streamer s = new Streamer(file, chunksize: 100 /, start: 8, end: 612007/);
StreamReader sr = new StreamReader(s, delay: 100);
Stream<StreamItem> stream = sr.filter('$.*');
sr.done.listen((onData){
print('completed');
});
sr.progress.listen((double onData){
print(onData);
});
stream.listen((StreamItem onData) {
print(onData.value);
print(onData.key);
});
Benchmarks
- 02.json takes 2 milliseconds to execute
- citylots.json takes 8 minutes to execute this is mostly because of how complex the dataset is.
- www.carqueryapi.com takes about 2.3 milliseconds, please note that the file had to be modified I remove ?( from the beginning and ); at the end.
Execution time depends exclusively on the complexity of the file and the number of items to be processed. ie. a file 3 times larger than citylots.json can take approximately five minutes to execute.Using combine:2 can speed up processing considerably.
Assigning Multiple Readers #
File file = new File(path.path + '/file.json');
Streamer s = new Streamer(file, chunksize: 100);
Reader r = new Reader(s, delay: 10,start: 8, end: 6170007);
int item = 0;
r
.filter('\$.*', (dynamic value, String key) {
value = null;
key = null;
//print("value is ${value} and key is ${key} of stream 2");
}).done(() {
print('${item} executed in ${stopwatch.elapsed}');
}).progress((double progress) {
print('progress is ${progress}');
}).fail((err) {
print(err);
print(err.stackTrace);
});
Streamer s2 =
new Streamer(file, chunksize: 100, start: 6170016, end: 12340015);
Reader r2 = new Reader(s2, delay: 0);
r2.filter('\$.*', (dynamic value, String key) {
value = null;
key = null;
//print("value is ${value} and key is ${key} of stream 2");
}).done(() {
print('executed in ${stopwatch2.elapsed}');
}).progress((double progress) {
print('progress is ${progress}');
}).fail((err) {
print("error 2 is ${err}");
print(err.stackTrace);
});
This project was tested using
- https://github.com/zemirco/sf-city-lots-json/blob/master/citylots.json
- https://github.com/thaiwsa/aws-speed/blob/master/JsonProcess/jsondata/02.json
- http://www.carqueryapi.com/api/0.3/?callback=?&cmd=getMakes&year=1970&sold_in_us=1&utm_medium=referral&utm_campaign=ZEEF&utm_source=https%3A%2F%2Fjson-datasets.zeef.com%2Fjdorfman
- https://catalogue.data.gov.bc.ca/dataset/children-and-family-development-cases-in-care-demographics