package edu.sdsc.globusauth.controller; /** * Created by mzhuang on 05/07/2021. */ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.Future; import org.ngbw.sdk.Workbench; import org.ngbw.sdk.api.tool.ToolResource; import org.ngbw.sdk.common.util.GsiSSHProcessRunner; import org.ngbw.sdk.core.types.DataFormat; import org.ngbw.sdk.core.types.DataType; import org.ngbw.sdk.core.types.EntityType; import org.ngbw.sdk.database.CipresDataParsingScheduler; import org.ngbw.sdk.database.DataParseStatus; import org.ngbw.sdk.database.Folder; import org.ngbw.sdk.database.UserDataItem; import org.ngbw.web.actions.NgbwSupport; public class GlobusDataImportManager { private static final Log log = LogFactory.getLog(GlobusDataImportManager.class.getName()); private static String m_acl_root = null; private String username = null; private static long fileSizeLevel = 314572800L; private static final String EXPANSE = "expanse"; private static final String LOGIN = "login"; static { Workbench wb = Workbench.getInstance(); m_acl_root = wb.getProperties().getProperty("loadGlobusFiles.acl.root"); String fileSizeLevelStr = wb.getProperties().getProperty("loadGlobusFiles.file.size"); if (fileSizeLevelStr != null && !fileSizeLevelStr.isEmpty()) { try { fileSizeLevel = Long.parseLong(fileSizeLevelStr); } catch (Exception e) { log.error("Property loadGlobusFiles.file.size is not correctly set with value: " + fileSizeLevelStr); log.error("Default to 314572800"); log.error(e); } } log.info("Property loadGlobusFiles.file.size="+fileSizeLevel); } public GlobusDataImportManager() { if (m_acl_root == null) { log.error("Missing workbench property: loadGlobusFiles.acl.root"); } } public void setUsername(String username) { this.username = username; } public boolean uploadGlobusData() { boolean uploaded = false; if (m_acl_root == null || username == null) return uploaded; Map, List>> filesToUpload = findResultsReady(); if (filesToUpload != null && filesToUpload.size() > 0) { uploaded = process(filesToUpload); } return uploaded; } private boolean process(Map, List>> mml) { boolean processSuccessful = false; if (mml != null && mml.size() > 0) { Set keys = mml.keySet(); for (String k : keys) { log.info("User:" + k); org.ngbw.sdk.database.User user = null; try { user = org.ngbw.sdk.database.User.findUser(k); } catch (Exception e) { log.error("Exception caught while looking up for user " + k); log.error(e); continue; } Map, List> m = mml.get(k); Set> keykeys = m.keySet(); for (GlobusDataImportManager.Pair kk : keykeys) { log.info("Destination folder: " + kk.getKey() + " Source Directory: " + kk.getValue()); List lf = m.get(kk); for (File afile : lf) { log.info("Uploading file: " + afile.getAbsolutePath()); processSuccessful = processSuccessful | uploadFile(user, kk.getKey(), afile); } /* File sourceDir = new File(kk.getValue()); String[] items = sourceDir.list(); if (items == null || items.length <=0) sourceDir.delete(); */ List files = new ArrayList(); List directories = new ArrayList(); listf(kk.getValue(), files, directories); for (String dir : directories) log.info("dir = " + dir); if (files.size() <= 0) { for (String d : directories) { File sf = new File(d); sf.delete(); } } } log.info("-----------------------------------"); } } return processSuccessful; } private void listf(String directoryName, List files, List directories) { File directory = new File(directoryName); File[] fList = directory.listFiles(); if(fList != null) for (File file : fList) { if (file.isFile()) { files.add(file); } else if (file.isDirectory()) { listf(file.getAbsolutePath(), files, directories); } } directories.add(directoryName); } private boolean chmodInbound(String inbound) { ToolResource trExpanse = Workbench.getInstance().getServiceFactory().getToolRegistry().getToolResource(EXPANSE); log.info("chmodInbound(): " + (trExpanse == null ? "trExpanse is null" : "trExpanse is not null")); if (trExpanse != null) { String cmd = "chmod -R g=rwx " + inbound; String login = trExpanse.getParameters().get(LOGIN); log.info("chomodInbound() Expanse login = " + login); if (login != null && !login.isEmpty()) { //SSHExecProcessRunner runner; //Class runnerClass; //String className = "org.ngbw.sdk.common.util.SSHExecProcessRunner"; GsiSSHProcessRunner runner = null; try { //runnerClass = (Class) Class.forName(className); //runner = runnerClass.newInstance(); runner = new GsiSSHProcessRunner(); HashMap cfg = new HashMap(); cfg.put(LOGIN, login); if (runner.configure(cfg)) { Future result = runner.start(cmd); log.info("chomodInbound() Command to run:" + cmd); int exitCode = result.get(); if (exitCode == 0) log.info("chomodInbound() exitCode = " + exitCode); else { log.error("chomodInbound() error occurred while running this command: " + cmd); log.error("exit code is: " + exitCode); log.error("stderr = " + runner.getStdOut()); return false; } } } catch (Exception e) { log.error("chomodInbound() exception thrown while running this command: " + cmd); log.error("Exception message = " + e.getMessage()); return false; } } } return true; } private Map, List>> findResultsReady() { Map, List>> filesToBeUploaded = null; //List files = new ArrayList(); List usernames = new ArrayList(); File f = new File(m_acl_root); String[] allUserNames = f.list(); if (allUserNames != null) { for (String u : allUserNames) if (u.equals(username)) { usernames.add(u); filesToBeUploaded = new HashMap, List>>(); break; } } //filesToBeUploaded = f.listFiles(); //usernames = f.list(); try { if (usernames != null && usernames.size() > 0 && filesToBeUploaded != null) { for (String name : usernames) { Map, List> mapInner = new HashMap, List>(); String inbound = m_acl_root + "/" + name + "/inbound"; chmodInbound(inbound); File f2 = new File(inbound); String[] folders = f2.list(); if (folders != null && folders.length > 0) { for (String folder : folders) { String inbound2 = inbound + "/" + folder; File f3 = new File(inbound2); String[] directories = f3.list(); if (directories != null && directories.length > 0) { for (String dir : directories) { String[] items = dir.split("\\_"); boolean validDir = false; long fileSize = 0; if (items != null && items.length == 2) { try { log.info("Under " + f3.getAbsolutePath() + ", there is dir: " + dir + " and the file size is: " + items[1]); fileSize = Long.parseLong(items[1]); } catch (Exception e) { log.error(e); validDir = false; } } String to = inbound2 + "/" + dir; List files = new ArrayList(); long sumOfFileSize = 0L; Files.walk(Paths.get(to)).filter(Files::isRegularFile).forEach(e -> files.add(e.toFile())); for (File tf : files) sumOfFileSize += tf.length(); validDir = (fileSize > 0 && sumOfFileSize >= fileSize); if (validDir) mapInner.put(new GlobusDataImportManager.Pair(folder,to), files); } } } } filesToBeUploaded.put(name, mapInner); } } } catch (IOException e) { log.error("Caught exception while searching for files that have been transferred ", e); } return filesToBeUploaded; } private class Pair { private K key; private V value; public Pair(K key, V value) { this.key = key; this.value = value; } public K getKey() { return key; } public V getValue() { return value; } } private boolean uploadFile(org.ngbw.sdk.database.User user, String folder, File aFile) { boolean uploadSuccessful = false; if (user == null || folder == null || aFile == null) return uploadSuccessful; List folders = null; Folder destinationFolder = null; try { folders = user.findFolders(); } catch (Exception e) { log.error("Exception caught while looking up for folder " + folder); log.error(e); return uploadSuccessful; } for (Folder f : folders) { if (f.getLabel().equals(folder)) destinationFolder = f; } if (destinationFolder == null) return uploadSuccessful; FileInputStream fis = null; UserDataItem userDataItem = null; try { // retrieve data item instance userDataItem = //getWorkbenchSession().getUserDataItemInstance(folder); new UserDataItem(destinationFolder ); // TRACE - 3 // populate data item //fis = new FileInputStream(aFile); //userDataItem.setData(fis); if (aFile.length() > fileSizeLevel) { Workbench.convertEncodingForGlobus(aFile); userDataItem.setDataFromLocalFile(aFile); } else { Workbench.convertEncoding(aFile); userDataItem.setData(aFile); } userDataItem.setLabel(aFile.getName()); userDataItem.setEntityType(EntityType.UNKNOWN); userDataItem.setDataType(DataType.UNKNOWN); userDataItem.setDataFormat(DataFormat.UNKNOWN); userDataItem.setUserAssignedDataFormat(DataFormat.UNKNOWN); userDataItem.setValidated(Boolean.FALSE); userDataItem.setParsedDate(null); userDataItem.setParseStatus(DataParseStatus.UNKNOWN().getId()); //userDataItem = getWorkbenchSession().saveUserDataItem(userDataItem, folder); userDataItem.setEnclosingFolder(destinationFolder); userDataItem.save(); uploadSuccessful = true; } catch ( Exception ex ) { log.error("Exception caught while upload file: " + aFile.getAbsolutePath() + " to folder: " + folder + " for user: " + user.getUsername()); log.error(ex); uploadSuccessful = false; } finally { if (fis != null) { try { fis.close(); } catch ( IOException ioe ) { log.error(ioe.getMessage(), ioe); } } } if (uploadSuccessful) { aFile.delete(); scheduleDataParse(userDataItem); } return uploadSuccessful; } private void scheduleDataParse(UserDataItem userDataItem) { if (userDataItem != null) { if (Workbench.getInstance().isMetaDataParserQueueEnabled(Boolean.FALSE)) { try { CipresDataParsingScheduler.CipresDataParsingSchedule schedule = null;schedule = CipresDataParsingScheduler.getInstance().schedule(userDataItem); if (schedule != null) { userDataItem.setParseStatus(DataParseStatus.QUEUED().getId()); userDataItem.save(); log.info("Data itemm: " + userDataItem.getName() + " has been queued for parsing."); } } catch (Exception e) { log.error("Exception caught while scheduling the parsing of a data item"); log.error(e); } } } } }