2010-10-11 17 views
6

J'utilise RPostgreSQL pour me connecter à une base de données locale. La configuration fonctionne très bien sur ma machine Linux. R 2.11.1, Postgres 8.4. Je jouais avec le 'foreach' avec le backend parallèle multicore (doMC) pour emballer quelques requêtes répétitives (quelques milliers) et ajouter les résultats dans une structure de données. Curieusement, cela fonctionne si j'utilise% do% mais échoue quand je passe à% dopar%, à l'exception quand il n'y a qu'une seule itération (comme montré ci-dessous)foreach% dopar% + RPostgreSQL

Je me demandais si cela avait quelque chose à voir avec un seul objet de connexion, donc j'ai créé 10 objets de connexion et en fonction de ce que j'étais, un certain objet con a été donné pour cette requête, en fonction de i modulo 10. (indiqué ci-dessous par seulement 2 objets de connexion). L'expression évaluée eval (expr.01) contient/est la requête qui dépend de ce que 'i' est.

Je n'arrive pas à comprendre ces messages d'erreur particuliers. Je me demande s'il existe un moyen de faire ce travail.

Merci.
Vishal Belsare

extrait de R suit:

> id.qed2.foreach <- foreach(i = 1588:1588, .inorder=FALSE) %dopar% { 
+ if (i %% 2 == 0) {con <- con0}; 
+ if (i %% 2 == 1) {con <- con1}; 
+ fetch(dbSendQuery(con,eval(expr.01)),n=-1)$idreuters}; 
> id.qed2.foreach 
[[1]] 
    [1] 411 414 2140 2406 4490 4507 4519 4570 4571 4572 4703 4731 
[109] 48765 84312 91797 

> id.qed2.foreach <- foreach(i = 1588:1589, .inorder=FALSE) %dopar% { 
+ if (i %% 2 == 0) {con <- con0}; 
+ if (i %% 2 == 1) {con <- con1}; 
+ fetch(dbSendQuery(con,eval(expr.01)),n=-1)$idreuters}; 
Error in stop(paste("expired", class(con))) : 
    no function to return from, jumping to top level 
Error in stop(paste("expired", class(con))) : 
    no function to return from, jumping to top level 
Error in { : 
    task 1 failed - "error in evaluating the argument 'res' in selecting a method for function 'fetch'" 
> 

EDIT: J'ai changé quelques petites choses, (toujours sans succès), mais quelques petites choses viennent à la lumière. Les objets de connexion créés dans la boucle et non «déconnectés» via dbDisconnect, conduisent à des connexions suspendues, comme le montre/var/log pour Postgres. Quelques nouveaux messages d'erreur apparaissent quand je fais ceci:

> system.time(
+ id.qed2.foreach <- foreach(i = 1588:1590, .inorder=FALSE, 
.packages=c("DBI", "RPostgreSQL")) %dopar% {drv0 <- dbDriver("PostgreSQL"); 
con0 <- dbConnect(drv0, dbname='nseindia'); 
list(idreuters=fetch(dbSendQuery(con0,eval(expr.01)),n=-1)$idreuters); 
dbDisconnect(con0)}) 
Error in postgresqlExecStatement(conn, statement, ...) : 
    no function to return from, jumping to top level 
Error in postgresqlExecStatement(conn, statement, ...) : 
    no function to return from, jumping to top level 
Error in postgresqlExecStatement(conn, statement, ...) : 
    no function to return from, jumping to top level 
Error in { : 
    task 1 failed - "error in evaluating the argument 'res' in selecting a method for function 'fetch'" 
+0

Si vous l'avez résolu, veuillez poster votre solution en réponse ci-dessous et la marquer comme acceptée. Ce sera utile pour référence future. – BoltClock

Répondre

2

Les travaux suivants et vitesses par ~ 1,5x sur une forme séquentielle. À l'étape suivante, je me demande s'il est possible d'attacher un objet de connexion à chacun des travailleurs générés par registerDoMC. Si c'est le cas, il n'y aurait pas besoin de créer/détruire les objets de connexion, ce qui évite de surcharger le serveur PostgreSQL avec des connexions.

pgparquery <- function(i) { 
drv <- dbDriver("PostgreSQL"); 
con <- dbConnect(drv, dbname='nsdq'); 
lst <- eval(expr.01); #contains the SQL query which depends on 'i' 
qry <- dbSendQuery(con,lst); 
tmp <- fetch(qry,n=-1); 
dt <- dates.qed2[i] 
dbDisconnect(con); 
result <- list(date=dt, idreuters=tmp$idreuters) 
return(result)} 

id.qed.foreach <- foreach(i = 1588:3638, .inorder=FALSE, .packages=c("DBI", "RPostgreSQL")) %dopar% {pgparquery(i)} 

-
Vishal Belsare

+1

Vishal, avez-vous déjà été capable de trouver comment attacher un objet de connexion persistante à chaque travailleur comme vous l'avez décrit ci-dessus? J'ai aussi eu ce problème. Intéressant que j'ai pu obtenir 'dbListTables (conn)' pour travailler sur les ouvriers mais les commandes de type 'dbgetQuery (conn, ...)' ne semblent pas fonctionner pour un objet de connexion attaché. – Noah

+0

@Noah Vous pourriez essayer d'utiliser la technique que je décris dans ma réponse. –

12

Il est plus efficace de créer la connexion de base de données une fois par travailleur, plutôt qu'une fois par tâche. Malheureusement, mclapply ne fournit pas un mécanisme pour initialiser les travailleurs avant d'exécuter des tâches, donc ce n'est pas facile à faire en utilisant le backend doMC, mais si vous utilisez le backend doParallel, vous pouvez initialiser les workers en utilisant clusterEvalQ. Voici un exemple de la façon de restructurer le code:

library(doParallel) 
cl <- makePSOCKcluster(detectCores()) 
registerDoParallel(cl) 

clusterEvalQ(cl, { 
    library(DBI) 
    library(RPostgreSQL) 
    drv <- dbDriver("PostgreSQL") 
    con <- dbConnect(drv, dbname="nsdq") 
    NULL 
}) 

id.qed.foreach <- foreach(i=1588:3638, .inorder=FALSE, 
          .noexport="con", 
          .packages=c("DBI", "RPostgreSQL")) %dopar% { 
    lst <- eval(expr.01) #contains the SQL query which depends on 'i' 
    qry <- dbSendQuery(con, lst) 
    tmp <- fetch(qry, n=-1) 
    dt <- dates.qed2[i] 
    list(date=dt, idreuters=tmp$idreuters) 
} 

clusterEvalQ(cl, { 
    dbDisconnect(con) 
}) 

Depuis doParallel et clusterEvalQ utilisent le même objet de cluster cl, la boucle foreach aura accès à l'objet de connexion de base de données con lors de l'exécution des tâches.

+0

Steve, c'est utile. Je me demande comment vous êtes arrivé à cette question plutôt ancienne après tout ce temps! –

+0

Existe-t-il un moyen d'utiliser foreach pour transmettre le code d'installation de la base de données à chaque worker?Je voudrais essayer d'utiliser ce code avec un backend parallèle arbitraire. – Zach

+0

@Zach Il n'y a pas de moyen indépendant d'initialiser les travailleurs, et cela m'a toujours dérangé. J'ai expérimenté avec des mécanismes spécifiques au backend (dans doMPI, par exemple), mais cela n'est jamais devenu une fonctionnalité générale, en partie parce que l'implémentation avec doMC serait probablement très moche puisque mclapply ne supporte pas l'initialisation du worker. –