#!/usr/bin/env python """ Simple example script that watches a folder for new files, imports that data to a data library, and then execute a workflow on it, creating a new history for each workflow invocation. This assumes a workflow with only one input, though it could be adapted to many. Sample call: python example_watch_folder.py /tmp/g_inbox/ /tmp/g_inbox/done/ "API Imports" f2db41e1fa331b3e NOTE: The upload method used requires the data library filesystem upload allow_library_path_paste """ import os import shutil import sys import time from common import ( display, submit, ) def main(api_key, api_url, in_folder, out_folder, data_library, workflow): # Find/Create data library with the above name. Assume we're putting datasets in the root folder '/' libs = display(api_key, api_url + "libraries", return_formatted=False) library_id = None for library in libs: if library["name"] == data_library: library_id = library["id"] if not library_id: lib_create_data = {"name": data_library} library = submit(api_key, api_url + "libraries", lib_create_data, return_formatted=False) library_id = library[0]["id"] folders = display(api_key, api_url + f"libraries/{library_id}/contents", return_formatted=False) for f in folders: if f["name"] == "/": library_folder_id = f["id"] workflow = display(api_key, api_url + f"workflows/{workflow}", return_formatted=False) if not workflow: print("Workflow %s not found, terminating.") sys.exit(1) if not library_id or not library_folder_id: print("Failure to configure library destination.") sys.exit(1) while 1: # Watch in_folder, upload anything that shows up there to data library and get ldda, # invoke workflow, move file to out_folder. for fname in os.listdir(in_folder): fullpath = os.path.join(in_folder, fname) if os.path.isfile(fullpath): data = {} data["folder_id"] = library_folder_id data["file_type"] = "auto" data["dbkey"] = "" data["upload_option"] = "upload_paths" data["filesystem_paths"] = fullpath data["create_type"] = "file" libset = submit(api_key, api_url + f"libraries/{library_id}/contents", data, return_formatted=False) # TODO Handle this better, but the datatype isn't always # set for the followup workflow execution without this # pause. time.sleep(5) for ds in libset: if "id" in ds: # Successful upload of dataset, we have the ldda now. Run the workflow. wf_data = {} wf_data["workflow_id"] = workflow["id"] wf_data["history"] = "{} - {}".format(fname, workflow["name"]) wf_data["ds_map"] = {} for step_id in workflow["inputs"].keys(): wf_data["ds_map"][step_id] = {"src": "ld", "id": ds["id"]} res = submit(api_key, api_url + "workflows", wf_data, return_formatted=False) if res: print(res) # Successful workflow execution, safe to move dataset. shutil.move(fullpath, os.path.join(out_folder, fname)) time.sleep(10) if __name__ == "__main__": try: api_key = sys.argv[1] api_url = sys.argv[2] in_folder = sys.argv[3] out_folder = sys.argv[4] data_library = sys.argv[5] workflow = sys.argv[6] except IndexError: print(f"usage: {os.path.basename(sys.argv[0])} key url in_folder out_folder data_library workflow") sys.exit(1) main(api_key, api_url, in_folder, out_folder, data_library, workflow)