diff --git a/NEWS.md b/NEWS.md index 60cb1dd0c..5357f00b3 100644 --- a/NEWS.md +++ b/NEWS.md @@ -333,6 +333,8 @@ 19. Ellipsis elements like `..1` are correctly excluded when searching for variables in "up-a-level" syntax inside `[`, [#5460](https://github.com/Rdatatable/data.table/issues/5460). Thanks @ggrothendieck for the report and @MichaelChirico for the fix. +20. Rolling functions now ensure there is no nested parallelism. It could have happened for vectorized input and `adaptive=TRUE`, [#7352](https://github.com/Rdatatable/data.table/issues/7352). Thanks @jangorecki for the fix. + ### NOTES 1. The following in-progress deprecations have proceeded: diff --git a/inst/tests/froll.Rraw b/inst/tests/froll.Rraw index 40fd8ace8..fd6d5450d 100644 --- a/inst/tests/froll.Rraw +++ b/inst/tests/froll.Rraw @@ -741,7 +741,7 @@ test(6000.177, frollmean(x, n, align="left"), output=c( nn = c(1:4,2:3,1:4) test(6000.178, frollmean(x, nn, adaptive=TRUE), output=c( "frollfunR: allocating memory for results 1x1", - "frollfunR: .*sequentially.*single rolling computation.*", + "frollfunR: .*sequentially because adaptive.*", "frollfunR: 1:", "frolladaptivemeanFast: running for input length 10, hasnf 0, narm 0", "frolladaptivefun: processing fun 0 algo 0 took.*", @@ -773,7 +773,7 @@ test(6000.181, frollmean(x, n, algo="exact"), output=c( "frollfunR: processing.*took.*")) test(6000.182, frollmean(x, nn, adaptive=TRUE), output=c( "frollfunR: allocating memory for results 1x1", - "frollfunR: .*sequentially.*single rolling computation.*", + "frollfunR: .*sequentially because adaptive.*", "frollfunR: 1:", "frolladaptivemeanFast: running for input length 10, hasnf 0, narm 0", "frolladaptivemeanFast: non-finite values are present in input, re-running with extra care for NFs", @@ -1444,6 +1444,13 @@ test(6001.731, frollvar(y, 3)[4L], 0) test(6001.732, frollsd(y, 3)[4L], 0) test(6001.733, frollvar(y, c(3,3,3,3), adaptive=TRUE)[4L], 0) test(6001.734, frollsd(y, c(3,3,3,3), adaptive=TRUE)[4L], 0) +test(6001.740, frollvar(c(1.5,2.5,2,NA), c(3,3)), list(c(NA,NA,0.25,NA), c(NA,NA,0.25,NA)), output="running sequentially, because outer parallelism has been used", options=c(datatable.verbose=TRUE)) # ensure no nested parallelism in rolling functions #7352 +test(6001.741, frollsd(c(1.5,2.5,2,NA), c(3,3)), list(c(NA,NA,0.5,NA), c(NA,NA,0.5,NA)), output="running sequentially, because outer parallelism has been used", options=c(datatable.verbose=TRUE)) +test(6001.742, frollvar(c(1.5,2.5,2,1.5), c(3,3)), list(c(NA,NA,0.25,0.25), c(NA,NA,0.25,0.25)), notOutput="running sequentially, because outer parallelism has been used", options=c(datatable.verbose=TRUE)) # no NA - no fallback to exact +test(6001.743, frollsd(c(1.5,2.5,2,1.5), c(3,3)), list(c(NA,NA,0.5,0.5), c(NA,NA,0.5,0.5)), notOutput="running sequentially, because outer parallelism has been used", options=c(datatable.verbose=TRUE)) +test(6001.744, frollvar(c(1.5,2.5,2,NA), 3), c(NA,NA,0.25,NA), notOutput="running sequentially, because outer parallelism has been used", options=c(datatable.verbose=TRUE)) # not vectorized - no outer parallelism +test(6001.745, frollsd(c(1.5,2.5,2,NA), 3), c(NA,NA,0.5,NA), notOutput="running sequentially, because outer parallelism has been used", options=c(datatable.verbose=TRUE)) +test(6001.750, frollvar(c(1.5,2.5,2,1.5), rep(3,4), adaptive=TRUE), c(NA,NA,0.25,0.25), output="sequentially because adaptive=TRUE is already parallelised within each rolling computation", options=c(datatable.verbose=TRUE)) # adaptive also disables outer parallelism test(6001.781, frollapply(FUN=var, 1:3, 0), c(NA_real_,NA_real_,NA_real_)) test(6001.782, frollapply(FUN=var, 1:3, 0, fill=99), c(NA_real_,NA_real_,NA_real_)) test(6001.783, frollapply(FUN=var, c(1:2,NA), 0), c(NA_real_,NA_real_,NA_real_)) diff --git a/src/data.table.h b/src/data.table.h index 663f0adb4..a299f91f8 100644 --- a/src/data.table.h +++ b/src/data.table.h @@ -249,9 +249,9 @@ void frollprodFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, void frollprodExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose); void frollmedianFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose, bool par); void frollmedianExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose); -void frollvarFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose); -void frollvarExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose); -void frollsdFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose); +void frollvarFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose, bool par); +void frollvarExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose, bool par); +void frollsdFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose, bool par); void frollsdExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose); // frolladaptive.c diff --git a/src/froll.c b/src/froll.c index 5b9b5b6a3..bccd2955f 100644 --- a/src/froll.c +++ b/src/froll.c @@ -77,14 +77,16 @@ void frollfun(rollfun_t rfun, unsigned int algo, const double *x, uint64_t nx, a break; case VAR : if (algo==0) { - frollvarFast(x, nx, ans, k, fill, narm, hasnf, verbose); + frollvarFast(x, nx, ans, k, fill, narm, hasnf, verbose, par); // par is used only when NAs - fallback to exact, to know if outer parallelism has been applied } else if (algo==1) { - frollvarExact(x, nx, ans, k, fill, narm, hasnf, verbose); + if (!par) // par should be true because frollvarExact at this place was invoked directly, and not by fallback, so algo=exact have been used explicitly, then outer parallelism in frollR.c is disabled already + internal_error(__func__, "par=FALSE but should be TRUE, algo=exact should have disabled outer parallelism for vectorized input so frollvarExact should be allowed to go parallel"); // # nocov + frollvarExact(x, nx, ans, k, fill, narm, hasnf, verbose, par); } break; case SD : if (algo==0) { - frollsdFast(x, nx, ans, k, fill, narm, hasnf, verbose); + frollsdFast(x, nx, ans, k, fill, narm, hasnf, verbose, par); // par is used only when NAs - fallback to exact, to know if outer parallelism has been applied } else if (algo==1) { frollsdExact(x, nx, ans, k, fill, narm, hasnf, verbose); } @@ -1146,7 +1148,7 @@ void frollprodExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill no support for NFs, redirecting to exact Welford wmean and m2 would have to be recalculated on each NF element */ -void frollvarFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose) { +void frollvarFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose, bool par) { if (verbose) snprintf(end(ans->message[0]), 500, _("%s: running for input length %"PRIu64", window %d, hasnf %d, narm %d\n"), "frollvarFast", (uint64_t)nx, k, hasnf, (int)narm); if (k == 0 || k == 1) { // var(scalar) is also NA @@ -1205,16 +1207,16 @@ void frollvarFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, if (truehasnf) { if (verbose) snprintf(end(ans->message[0]), 500, _("%s: non-finite values are present in input, redirecting to frollvarExact using has.nf=TRUE\n"), __func__); - frollvarExact(x, nx, ans, k, fill, narm, /*hasnf=*/true, verbose); + frollvarExact(x, nx, ans, k, fill, narm, /*hasnf=*/true, verbose, par); return; } } /* fast rolling var - exact */ -void frollvarExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose) { +void frollvarExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose, bool par) { if (verbose) - snprintf(end(ans->message[0]), 500, _("%s: running in parallel for input length %"PRIu64", window %d, hasnf %d, narm %d\n"), "frollvarExact", (uint64_t)nx, k, hasnf, (int)narm); + snprintf(end(ans->message[0]), 500, _("%s: running %s for input length %"PRIu64", window %d, hasnf %d, narm %d\n"), "frollvarExact", par ? "in parallel" : "sequentially, because outer parallelism has been used,", (uint64_t)nx, k, hasnf, (int)narm); if (k == 0 || k == 1) { // var(scalar) is also NA if (verbose) snprintf(end(ans->message[0]), 500, _("%s: window width of size %d, returning all NA vector\n"), __func__, k); @@ -1228,7 +1230,7 @@ void frollvarExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill, } bool truehasnf = hasnf>0; if (!truehasnf || !narm) { - #pragma omp parallel for num_threads(getDTthreads(nx, true)) shared(truehasnf) + #pragma omp parallel for if (par) num_threads(getDTthreads(nx, true)) shared(truehasnf) for (uint64_t i=k-1; imessage[0]), 500, _("%s: calling sqrt(frollvarFast(...))\n"), "frollsdFast"); - frollvarFast(x, nx, ans, k, fill, narm, hasnf, verbose); + frollvarFast(x, nx, ans, k, fill, narm, hasnf, verbose, par); for (uint64_t i=k-1; idbl_v[i] = sqrt(ans->dbl_v[i]); } @@ -1331,7 +1333,7 @@ void frollsdFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, b void frollsdExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose) { if (verbose) snprintf(end(ans->message[0]), 500, _("%s: calling sqrt(frollvarExact(...))\n"), "frollsdExact"); - frollvarExact(x, nx, ans, k, fill, narm, hasnf, verbose); + frollvarExact(x, nx, ans, k, fill, narm, hasnf, verbose, /*par=*/true); // par=true because frollsdExact at this place was invoked directly, and not by fallback, so algo=exact have been used explicitly, then outer parallelism in frollR.c is disabled already. If it would be algo=fast then sdFast -> varFast -> NAs -> varExact, so sdExact is no emplyed in the process, nothing redirects to sdExact for (uint64_t i=k-1; idbl_v[i] = sqrt(ans->dbl_v[i]); } diff --git a/src/frollR.c b/src/frollR.c index 71963e6f8..afb70c3f1 100644 --- a/src/frollR.c +++ b/src/frollR.c @@ -193,12 +193,14 @@ SEXP frollfunR(SEXP fun, SEXP xobj, SEXP kobj, SEXP fill, SEXP algo, SEXP align, else internal_error(__func__, "invalid %s argument in %s function should have been caught earlier", "algo", "rolling"); // # nocov - bool par = nx*nk>1 && ialgo==0; + bool par = nx*nk>1 && ialgo==0 && !badaptive; // for algo=exact and !badaptive we parallelize inside if (verbose) { if (par) { Rprintf(_("%s: computing %d column(s) and %d window(s) in parallel\n"), __func__, nx, nk); } else if (ialgo==1) { Rprintf(_("%s: computing %d column(s) and %d window(s) sequentially because algo='exact' is already parallelised within each rolling computation\n"), __func__, nx, nk); + } else if (badaptive) { + Rprintf(_("%s: computing %d column(s) and %d window(s) sequentially because adaptive=TRUE is already parallelised within each rolling computation\n"), __func__, nx, nk); } else if (nx*nk==1) { Rprintf(_("%s: computing %d column(s) and %d window(s) sequentially as there is only single rolling computation\n"), __func__, nx, nk); }