Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,8 @@

19. Ellipsis elements like `..1` are correctly excluded when searching for variables in "up-a-level" syntax inside `[`, [#5460](https://github.com/Rdatatable/data.table/issues/5460). Thanks @ggrothendieck for the report and @MichaelChirico for the fix.

20. Rolling functions now ensure there is no nested parallelism. It could have happened for vectorized input and `adaptive=TRUE`, [#7352](https://github.com/Rdatatable/data.table/issues/7352). Thanks @jangorecki for the fix.

### NOTES

1. The following in-progress deprecations have proceeded:
Expand Down
11 changes: 9 additions & 2 deletions inst/tests/froll.Rraw
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ test(6000.177, frollmean(x, n, align="left"), output=c(
nn = c(1:4,2:3,1:4)
test(6000.178, frollmean(x, nn, adaptive=TRUE), output=c(
"frollfunR: allocating memory for results 1x1",
"frollfunR: .*sequentially.*single rolling computation.*",
"frollfunR: .*sequentially because adaptive.*",
"frollfunR: 1:",
"frolladaptivemeanFast: running for input length 10, hasnf 0, narm 0",
"frolladaptivefun: processing fun 0 algo 0 took.*",
Expand Down Expand Up @@ -773,7 +773,7 @@ test(6000.181, frollmean(x, n, algo="exact"), output=c(
"frollfunR: processing.*took.*"))
test(6000.182, frollmean(x, nn, adaptive=TRUE), output=c(
"frollfunR: allocating memory for results 1x1",
"frollfunR: .*sequentially.*single rolling computation.*",
"frollfunR: .*sequentially because adaptive.*",
"frollfunR: 1:",
"frolladaptivemeanFast: running for input length 10, hasnf 0, narm 0",
"frolladaptivemeanFast: non-finite values are present in input, re-running with extra care for NFs",
Expand Down Expand Up @@ -1444,6 +1444,13 @@ test(6001.731, frollvar(y, 3)[4L], 0)
test(6001.732, frollsd(y, 3)[4L], 0)
test(6001.733, frollvar(y, c(3,3,3,3), adaptive=TRUE)[4L], 0)
test(6001.734, frollsd(y, c(3,3,3,3), adaptive=TRUE)[4L], 0)
test(6001.740, frollvar(c(1.5,2.5,2,NA), c(3,3)), list(c(NA,NA,0.25,NA), c(NA,NA,0.25,NA)), output="running sequentially, because outer parallelism has been used", options=c(datatable.verbose=TRUE)) # ensure no nested parallelism in rolling functions #7352
test(6001.741, frollsd(c(1.5,2.5,2,NA), c(3,3)), list(c(NA,NA,0.5,NA), c(NA,NA,0.5,NA)), output="running sequentially, because outer parallelism has been used", options=c(datatable.verbose=TRUE))
test(6001.742, frollvar(c(1.5,2.5,2,1.5), c(3,3)), list(c(NA,NA,0.25,0.25), c(NA,NA,0.25,0.25)), notOutput="running sequentially, because outer parallelism has been used", options=c(datatable.verbose=TRUE)) # no NA - no fallback to exact
test(6001.743, frollsd(c(1.5,2.5,2,1.5), c(3,3)), list(c(NA,NA,0.5,0.5), c(NA,NA,0.5,0.5)), notOutput="running sequentially, because outer parallelism has been used", options=c(datatable.verbose=TRUE))
test(6001.744, frollvar(c(1.5,2.5,2,NA), 3), c(NA,NA,0.25,NA), notOutput="running sequentially, because outer parallelism has been used", options=c(datatable.verbose=TRUE)) # not vectorized - no outer parallelism
test(6001.745, frollsd(c(1.5,2.5,2,NA), 3), c(NA,NA,0.5,NA), notOutput="running sequentially, because outer parallelism has been used", options=c(datatable.verbose=TRUE))
test(6001.750, frollvar(c(1.5,2.5,2,1.5), rep(3,4), adaptive=TRUE), c(NA,NA,0.25,0.25), output="sequentially because adaptive=TRUE is already parallelised within each rolling computation", options=c(datatable.verbose=TRUE)) # adaptive also disables outer parallelism
test(6001.781, frollapply(FUN=var, 1:3, 0), c(NA_real_,NA_real_,NA_real_))
test(6001.782, frollapply(FUN=var, 1:3, 0, fill=99), c(NA_real_,NA_real_,NA_real_))
test(6001.783, frollapply(FUN=var, c(1:2,NA), 0), c(NA_real_,NA_real_,NA_real_))
Expand Down
6 changes: 3 additions & 3 deletions src/data.table.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,9 @@ void frollprodFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill,
void frollprodExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose);
void frollmedianFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose, bool par);
void frollmedianExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose);
void frollvarFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose);
void frollvarExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose);
void frollsdFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose);
void frollvarFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose, bool par);
void frollvarExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose, bool par);
void frollsdFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose, bool par);
void frollsdExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose);

// frolladaptive.c
Expand Down
24 changes: 12 additions & 12 deletions src/froll.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ void frollfun(rollfun_t rfun, unsigned int algo, const double *x, uint64_t nx, a
break;
case VAR :
if (algo==0) {
frollvarFast(x, nx, ans, k, fill, narm, hasnf, verbose);
frollvarFast(x, nx, ans, k, fill, narm, hasnf, verbose, par); // par is used only when NAs - fallback to exact, to know if outer parallelism has been applied
} else if (algo==1) {
frollvarExact(x, nx, ans, k, fill, narm, hasnf, verbose);
frollvarExact(x, nx, ans, k, fill, narm, hasnf, verbose, /*par=*/ true); // par=true because frollvarExact at this place was invoked directly, and not by fallback, so algo=exact have been used explicitly, then outer parallelism in frollR.c is disabled already
}
break;
case SD :
if (algo==0) {
frollsdFast(x, nx, ans, k, fill, narm, hasnf, verbose);
frollsdFast(x, nx, ans, k, fill, narm, hasnf, verbose, par); // par is used only when NAs - fallback to exact, to know if outer parallelism has been applied
} else if (algo==1) {
frollsdExact(x, nx, ans, k, fill, narm, hasnf, verbose);
}
Expand Down Expand Up @@ -1146,7 +1146,7 @@ void frollprodExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill
no support for NFs, redirecting to exact
Welford wmean and m2 would have to be recalculated on each NF element
*/
void frollvarFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose) {
void frollvarFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose, bool par) {
if (verbose)
snprintf(end(ans->message[0]), 500, _("%s: running for input length %"PRIu64", window %d, hasnf %d, narm %d\n"), "frollvarFast", (uint64_t)nx, k, hasnf, (int)narm);
if (k == 0 || k == 1) { // var(scalar) is also NA
Expand Down Expand Up @@ -1205,16 +1205,16 @@ void frollvarFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill,
if (truehasnf) {
if (verbose)
snprintf(end(ans->message[0]), 500, _("%s: non-finite values are present in input, redirecting to frollvarExact using has.nf=TRUE\n"), __func__);
frollvarExact(x, nx, ans, k, fill, narm, /*hasnf=*/true, verbose);
frollvarExact(x, nx, ans, k, fill, narm, /*hasnf=*/true, verbose, par);
return;
}
}

/* fast rolling var - exact
*/
void frollvarExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose) {
void frollvarExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose, bool par) {
if (verbose)
snprintf(end(ans->message[0]), 500, _("%s: running in parallel for input length %"PRIu64", window %d, hasnf %d, narm %d\n"), "frollvarExact", (uint64_t)nx, k, hasnf, (int)narm);
snprintf(end(ans->message[0]), 500, _("%s: running %s for input length %"PRIu64", window %d, hasnf %d, narm %d\n"), "frollvarExact", par ? "in parallel" : "sequentially, because outer parallelism has been used,", (uint64_t)nx, k, hasnf, (int)narm);
if (k == 0 || k == 1) { // var(scalar) is also NA
if (verbose)
snprintf(end(ans->message[0]), 500, _("%s: window width of size %d, returning all NA vector\n"), __func__, k);
Expand All @@ -1228,7 +1228,7 @@ void frollvarExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill,
}
bool truehasnf = hasnf>0;
if (!truehasnf || !narm) {
#pragma omp parallel for num_threads(getDTthreads(nx, true)) shared(truehasnf)
#pragma omp parallel for if (par) num_threads(getDTthreads(nx, true)) shared(truehasnf)
for (uint64_t i=k-1; i<nx; i++) {
if (narm && truehasnf) {
continue;
Expand Down Expand Up @@ -1271,7 +1271,7 @@ void frollvarExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill,
}
}
if (truehasnf && narm) {
#pragma omp parallel for num_threads(getDTthreads(nx, true))
#pragma omp parallel for if (par) num_threads(getDTthreads(nx, true))
for (uint64_t i=k-1; i<nx; i++) {
long double wsum = 0.0;
int nc = 0;
Expand Down Expand Up @@ -1317,10 +1317,10 @@ void frollvarExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill,

/* fast rolling sd - fast
*/
void frollsdFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose) {
void frollsdFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose, bool par) {
if (verbose)
snprintf(end(ans->message[0]), 500, _("%s: calling sqrt(frollvarFast(...))\n"), "frollsdFast");
frollvarFast(x, nx, ans, k, fill, narm, hasnf, verbose);
frollvarFast(x, nx, ans, k, fill, narm, hasnf, verbose, par);
for (uint64_t i=k-1; i<nx; i++) {
ans->dbl_v[i] = sqrt(ans->dbl_v[i]);
}
Expand All @@ -1331,7 +1331,7 @@ void frollsdFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, b
void frollsdExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose) {
if (verbose)
snprintf(end(ans->message[0]), 500, _("%s: calling sqrt(frollvarExact(...))\n"), "frollsdExact");
frollvarExact(x, nx, ans, k, fill, narm, hasnf, verbose);
frollvarExact(x, nx, ans, k, fill, narm, hasnf, verbose, /*par=*/ true); // par=true because frollsdExact at this place was invoked directly, and not by fallback, so algo=exact have been used explicitly, then outer parallelism in frollR.c is disabled already
for (uint64_t i=k-1; i<nx; i++) {
ans->dbl_v[i] = sqrt(ans->dbl_v[i]);
}
Expand Down
4 changes: 3 additions & 1 deletion src/frollR.c
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,14 @@ SEXP frollfunR(SEXP fun, SEXP xobj, SEXP kobj, SEXP fill, SEXP algo, SEXP align,
else
internal_error(__func__, "invalid %s argument in %s function should have been caught earlier", "algo", "rolling"); // # nocov

bool par = nx*nk>1 && ialgo==0;
bool par = nx*nk>1 && ialgo==0 && !badaptive; // for algo=exact and !badaptive we parallelize inside
if (verbose) {
if (par) {
Rprintf(_("%s: computing %d column(s) and %d window(s) in parallel\n"), __func__, nx, nk);
} else if (ialgo==1) {
Rprintf(_("%s: computing %d column(s) and %d window(s) sequentially because algo='exact' is already parallelised within each rolling computation\n"), __func__, nx, nk);
} else if (badaptive) {
Rprintf(_("%s: computing %d column(s) and %d window(s) sequentially because adaptive=TRUE is already parallelised within each rolling computation\n"), __func__, nx, nk);
} else if (nx*nk==1) {
Rprintf(_("%s: computing %d column(s) and %d window(s) sequentially as there is only single rolling computation\n"), __func__, nx, nk);
}
Expand Down
Loading