First published on MSDN on May 04, 2017
rxExecBy is designed to resolve a problem that user has a very big data set, want to partition it into many small partitions, and train models on each partition. This is what we call small data many models. rxExecBy has many features and can run in many different compute contexts, e.g. RxSpark, RxInSqlServer, local. In this blog, I'm going to cover RxSpark compute context and help you understand the details.
In RxSpark, rxExecBy supports RxTextData, RxXdfData(Composite Set), RxHiveData, RxParquetData and RxOrcData. I'm going to use AirlineDemoSmall.csv from RevoScaleR package, and convert it into all these supported data sources.
Group By Keys
The first and most important feature of rxExecBy is partitioning. Given a data source, rxExecBy can partition the data by single key, as well as multi keys. To do it, simply put the var names into a vector and pass to rxExecBy argument "keys".
rxExecBy will return a list of results from partitions, the length of list equals to number of partitions. For each result of partition, it's a list of "keys", "result" and "status",
"keys" indicates the value of keys for that partition.
"result" is the return value of UDF if run success.
"status" shows the run status of UDF, "OK" or "Error", as well as other runtime warning/error messages.
Take the single key c("DayOfWeek") as example. The key has 7 values, so the returned value is a list of 7 objects, with each object is list of 3,
"keys" is list of 1, and the object is a factor with values from 1 to 7, mapping to "Monday", "Sunday", ....
"result" is the number rows of partition return by nrow in UDF.
In the multi keys c("DayOfWeek", "ArrDelay") example, the length of returned value shows a total of 3233 partitions. From the result of 1st partition, we can see the "keys" is a list of 2,
1st object is factor 4, which is the value of "DayOfWeek"
2nd object is int 388, which is the value of "ArrDelay"
Another very handy feature in partition result is "status", which is a list of 3,
1st object is character, indicates if UDF runs success.
2nd object is character of the error message if UDF runs into failure.
3rd object is character vector, contains all the warnings while running UDF.
This example shows how the "status" collects warnign and error message.
1st partition result shows 78875 rows, no error and warnings.
2nd partition result shows 82987 rows, no error, 1 warning message.
4th partition result shows 86159 rows, no error, 2 warning messages.
5th partition result shows NULL, as it runs into stop.
1st object of "status" shows "Error"
2nd object shows the error message
3rd object shows all the 3 warning messages
7th partition result shows 94975 rows, no error, 3 warning messages.
RxXdfData as Input Data Source of UDF
UDF is the R function provided by user to apply on each partition, it takes "keys" and "data" as two required input parameters, where "keys" determines the partitioning values and "data" is a data source object of the corresponding partition.
In RxSpark compute context, "data" is a RxXdfData data source,
This makes it easy to run any rx-functions directly with good performance and not memory bound, because of the nature of RxXdfData, binary format and stored by chunk.
User also has the flexibility to convert it to R data.frame by an additional rxImport/rxDataStep call.
This example shows the ease of use for rx functions by consuming RxXdfData directly.
Unroll funcParams to UDF
Except two required parameters, UDF can also take additional parameters which allow user to pass arbitrary values into UDF. This is done by funcParams, here is an example.
Factor & ColInfo
Factor and ColInfo are also well supported by rxExecBy, however you need to refernce the R help documents to check what are supported for each data source.
This example shows how to read a hive data source and convert the string column into factor. It also defines the factor levels, so any string value of that column that doesn't show up in the level list will be considered as missing value.