Je suis le développeur de Ruffus. Je ne suis pas sûr de comprendre tout à fait ce que vous essayez de faire mais voici:
Attendre que des travaux qui prennent un temps différent pour terminer la prochaine étape de votre pipeline est exactement ce que Ruffus est à peu près c'est, espérons-le, simple.
La première question est de savoir quels fichiers sont créés en amont, c'est-à-dire avant que le pipeline ne soit exécuté? Commençons par supposer que vous faites. Nous allons écrire une fonction fictive qui crée un fichier chaque fois qu'il est appelé. Dans Ruffus, les noms des fichiers d'entrée et de sortie sont respectivement contenus dans les deux premiers paramètres. Nous avons pas de nom de fichier d'entrée, de sorte que nos appels de fonction devrait ressembler à ceci:
create_file(None, "one.file")
create_file(None, "two.file")
create_file(None, "three.file")
La définition de CREATE_FILE ressemblerait à ceci:
@files([(None, fn) for fn in filenames])
def create_file(no_input_file_name, output_file_name):
open(output_file_name, "w").write("dummy file")
Chacun de ces fichiers seraient créés en 3 appels distincts pour créer un fichier. Ceux-ci peuvent être exécutés en parallèle si vous le souhaitez.
pipeline_run([create_file], multiprocess = 5)
Maintenant, pour combiner les fichiers. Le décorateur "@Merge" est en effet mis en place précisément pour cela. Nous avons juste besoin de le relier à la fonction précédente:
@merge(create_file, "merge.file")
def merge_file(input_file_names, output_file_name):
output_file = open(output_file_name, "w")
for i in input_file_names:
output_file.write(open(i).read())
Cela ne MERGE_FILE appel lorsque tous les fichiers sont prêts à partir des trois appels à CREATE_FILE().
Tout le code est le suivant:
from ruffus import *
filenames = ["one.file", "two.file", "three.file"]
from random import randint
from time import sleep
@files([(None, fn) for fn in filenames])
def create_file(no_input_file_name, output_file_name):
# simulate create file process of indeterminate complexity
sleep(randint(1,5))
open(output_file_name, "w").write("dummy file")
@merge(create_file, "merge.file")
def merge_file(input_file_names, output_file_name):
output_file = open(output_file_name, "w")
for i in input_file_names:
output_file.write(open(i).read())
pipeline_run([merge_file], multiprocess = 5)
Et voici le résultat:
>>> pipeline_run([merge_file], multiprocess = 5)
Job = [None -> two.file] completed
Job = [None -> three.file] completed
Job = [None -> one.file] completed
Completed Task = create_file
Job = [[one.file, three.file, two.file] -> merge.file] completed
Completed Task = merge_file