JAVA运行Kettle的JOB或者转换的总结

xiaoxiao2021-07-05  321

第一步,连接资源库

本例采用的是单例设计模式,这样做的优点是如果资源库已经连接,则不必重新连接

下面直接上代码

package com.qm.util;

import java.io.File; import java.io.IOException;

import org.jeecgframework.core.util.PropertiesUtil; import org.pentaho.di.core.Const; import org.pentaho.di.core.KettleEnvironment; import org.pentaho.di.core.database.DatabaseMeta; import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.plugins.DatabasePluginType; import org.pentaho.di.core.plugins.EnginePluginType; import org.pentaho.di.core.plugins.JobEntryPluginType; import org.pentaho.di.core.plugins.PluginFolder; import org.pentaho.di.core.plugins.RepositoryPluginType; import org.pentaho.di.core.plugins.StepPluginType; import org.pentaho.di.repository.kdr.KettleDatabaseRepository; import org.pentaho.di.repository.kdr.KettleDatabaseRepositoryMeta;

public class RepBuilder{          private RepBuilder(){};     private static KettleDatabaseRepository rep=null;     public static KettleDatabaseRepository getInstance() throws KettleException{         if(rep==null){             synchronized(RepBuilder.class){                 if(rep==null){                      //初始化                       //EnvUtil.environmentInit();                     PropertiesUtil p = new PropertiesUtil("etl.properties");                     File   file = new File(p.readProperty("pdi.dir"));//这个path就是jdbc.prtoperties文件的配置路径,指向上层文件夹路径即可,建议是绝对路径。                     String path;                     try {                         path = file.getCanonicalPath();                         Const.JNDI_DIRECTORY = path;                         System.setProperty( "java.naming.factory.initial", "org.osjava.sj.SimpleContextFactory" );                         System.setProperty( "org.osjava.sj.root", path);                         System.setProperty( "org.osjava.sj.delimiter", "/" );                         /*System.setProperty( "KETTLE_HOME", "C:\\Users\\wgq\\.kettle" );*/                     } catch (IOException e) {                         // TODO Auto-generated catch block                         e.printStackTrace();                     }                     //加载插件                     PluginFolder pluginFolder = new PluginFolder(p.readProperty("pdi.plugins"),true,true);                     PluginFolder pluginFolderKaraf = new PluginFolder(p.readProperty("pdi.pluginsKaraf"),true,true);

                    EnginePluginType.getInstance().getPluginFolders().add(pluginFolder);                     JobEntryPluginType.getInstance().getPluginFolders().add(pluginFolder);                     StepPluginType.getInstance().getPluginFolders().add(pluginFolder);                     DatabasePluginType.getInstance().getPluginFolders().add(pluginFolder);                     DatabasePluginType.getInstance().getPluginFolders().add(pluginFolderKaraf);

                    KettleEnvironment.init();                       //数据库连接元对象                                                     DatabaseMeta dataMeta = new DatabaseMeta(p.readProperty("pdi.conName"), p.readProperty("pdi.conType"),                              p.readProperty("pdi.access"),p.readProperty("pdi.hostName"),p.readProperty("pdi.dataBaseName"),                             p.readProperty("pdi.portNumber"),p.readProperty("pdi.username"),p.readProperty("pdi.password"));                     //数据库形式的资源库元对象                       KettleDatabaseRepositoryMeta repInfo = new KettleDatabaseRepositoryMeta();                       //                       repInfo.setConnection(dataMeta);                                              //数据库形式的资源库对象                        KettleDatabaseRepository rep = new KettleDatabaseRepository();                       //用资源库元对象初始化资源库对象                       rep.init(repInfo);                       //连接到资源库                       rep.connect(p.readProperty("rep.username"), p.readProperty("rep.password"));//默认的连接资源库的用户名和密码                       if(rep.isConnected()){                           System.out.println("连接成功");                           return rep;                       }else{                           System.out.println("连接失败");                           return null;                       }                   }             }         }         return rep;     }   

}

下面是etl.properties 的配置仅供参考

rep.name=pdires rep.username=admin rep.password=admin

#pdi.path=D:\\data-integration #pdi.pan=pan.bat #pdi.kitchen=kitchen.bat

pdi.dir=D:\\data-integration\\simple-jndi pdi.conName=pdires pdi.conType=Oracle pdi.access=JNDI pdi.hostName=10.6.12.333 pdi.dataBaseName=pdires pdi.portNumber=1521 pdi.username=pdires pdi.password=pdires pdi.plugins=D:\\data-integration\\plugins pdi.pluginsKaraf=D:\\data-integration\\system\\karaf\\system\\pentaho

然后创建对象Object obj =    RepBuilder.getInstance();

如果obj!=null 则表明连接成功

第二部分,运行job或者转换

 /**       * @author WGQ       * 根据指定的资源库对象及job名称   运行指定的job       * @param filename       * @param parameters       * @param dirName       * @return       * @throws KettleException       * */       public static Result runJob(KettleDatabaseRepository rep,String jobName, String[] parameters, String filename, String dirName) throws Exception{                                RepositoryDirectoryInterface dir = rep.findDirectory(dirName);//根据指定的字符串路径 找到目录                   //加载指定的job                   JobMeta jobMeta = rep.loadJob(rep.getJobId(jobName, dir), null);                   Job job = new Job(rep, jobMeta);                   //设置参数                   if(parameters!=null){                                          for(String param : parameters){                         String key = param.split("=")[0];                         String value = param.substring(param.indexOf("=")+1);                         jobMeta.setParameterValue(key,value);                     }                 }                 if(!"".equals(filename)){                     PropertiesUtil p = new PropertiesUtil("sysConfig.properties");                     jobMeta.setParameterValue("filename", p.readProperty("webUploadpath")+filename);                 }                 job.setLogLevel(LogLevel.BASIC);                   //启动执行指定的job                   job.run();                  Result result = job.getResult();                 job.waitUntilFinished();//等待job执行完;                   job.setFinished(true);                   System.out.println(job.getResult());               return result;     }     /**       * @author WGQ       *        * @param rep  资源库对象       * @param transName 要调用的trans名称       *        * 调用资源库中的trans       * @param filename       * @param parameters       * @param dirName       * @return       * @throws KettleException       *        * */       public static Result runTrans(KettleDatabaseRepository rep,String transName, String[] parameters, String filename, String dirName) throws KettleException{                                  RepositoryDirectoryInterface dir = rep.findDirectory(dirName);//根据指定的字符串路径 找到目录               TransMeta tmeta = rep.loadTransformation(rep.getTransformationID(transName, dir), null);               //设置参数               //tmeta.setParameterValue("", "");               if(parameters!=null){                                  for(String param : parameters){                     String key = param.split("=")[0];                     String value = param.substring(param.indexOf("=")+1);                     tmeta.setParameterValue(key,value);                 }             }             if(!"".equals(filename)){                 PropertiesUtil p = new PropertiesUtil("sysConfig.properties");                 tmeta.setParameterValue("filename", p.readProperty("webUploadpath")+filename);             }             Trans trans = new Trans(tmeta);               trans.setLogLevel(LogLevel.BASIC);              trans.execute(null);//执行trans               trans.waitUntilFinished();               if(trans.getErrors()>0){                   System.out.println("有异常");               }               Result result = trans.getResult();             return result;     } 

这里讲解下参数jobName 和tranName 是job名称和转换名称,fileName 上传的文件的全路径,parameters 参数,dirName 是job或者转换所在的根目录 

转载请注明原文地址: https://www.6miu.com/read-4821437.html

最新回复(0)