第一步,连接资源库
本例采用的是单例设计模式,这样做的优点是如果资源库已经连接,则不必重新连接
下面直接上代码
package com.qm.util;
import java.io.File; import java.io.IOException;
import org.jeecgframework.core.util.PropertiesUtil; import org.pentaho.di.core.Const; import org.pentaho.di.core.KettleEnvironment; import org.pentaho.di.core.database.DatabaseMeta; import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.plugins.DatabasePluginType; import org.pentaho.di.core.plugins.EnginePluginType; import org.pentaho.di.core.plugins.JobEntryPluginType; import org.pentaho.di.core.plugins.PluginFolder; import org.pentaho.di.core.plugins.RepositoryPluginType; import org.pentaho.di.core.plugins.StepPluginType; import org.pentaho.di.repository.kdr.KettleDatabaseRepository; import org.pentaho.di.repository.kdr.KettleDatabaseRepositoryMeta;
public class RepBuilder{ private RepBuilder(){}; private static KettleDatabaseRepository rep=null; public static KettleDatabaseRepository getInstance() throws KettleException{ if(rep==null){ synchronized(RepBuilder.class){ if(rep==null){ //初始化 //EnvUtil.environmentInit(); PropertiesUtil p = new PropertiesUtil("etl.properties"); File file = new File(p.readProperty("pdi.dir"));//这个path就是jdbc.prtoperties文件的配置路径,指向上层文件夹路径即可,建议是绝对路径。 String path; try { path = file.getCanonicalPath(); Const.JNDI_DIRECTORY = path; System.setProperty( "java.naming.factory.initial", "org.osjava.sj.SimpleContextFactory" ); System.setProperty( "org.osjava.sj.root", path); System.setProperty( "org.osjava.sj.delimiter", "/" ); /*System.setProperty( "KETTLE_HOME", "C:\\Users\\wgq\\.kettle" );*/ } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } //加载插件 PluginFolder pluginFolder = new PluginFolder(p.readProperty("pdi.plugins"),true,true); PluginFolder pluginFolderKaraf = new PluginFolder(p.readProperty("pdi.pluginsKaraf"),true,true);
EnginePluginType.getInstance().getPluginFolders().add(pluginFolder); JobEntryPluginType.getInstance().getPluginFolders().add(pluginFolder); StepPluginType.getInstance().getPluginFolders().add(pluginFolder); DatabasePluginType.getInstance().getPluginFolders().add(pluginFolder); DatabasePluginType.getInstance().getPluginFolders().add(pluginFolderKaraf);
KettleEnvironment.init(); //数据库连接元对象 DatabaseMeta dataMeta = new DatabaseMeta(p.readProperty("pdi.conName"), p.readProperty("pdi.conType"), p.readProperty("pdi.access"),p.readProperty("pdi.hostName"),p.readProperty("pdi.dataBaseName"), p.readProperty("pdi.portNumber"),p.readProperty("pdi.username"),p.readProperty("pdi.password")); //数据库形式的资源库元对象 KettleDatabaseRepositoryMeta repInfo = new KettleDatabaseRepositoryMeta(); // repInfo.setConnection(dataMeta); //数据库形式的资源库对象 KettleDatabaseRepository rep = new KettleDatabaseRepository(); //用资源库元对象初始化资源库对象 rep.init(repInfo); //连接到资源库 rep.connect(p.readProperty("rep.username"), p.readProperty("rep.password"));//默认的连接资源库的用户名和密码 if(rep.isConnected()){ System.out.println("连接成功"); return rep; }else{ System.out.println("连接失败"); return null; } } } } return rep; }
}
下面是etl.properties 的配置仅供参考
rep.name=pdires rep.username=admin rep.password=admin
#pdi.path=D:\\data-integration #pdi.pan=pan.bat #pdi.kitchen=kitchen.bat
pdi.dir=D:\\data-integration\\simple-jndi pdi.conName=pdires pdi.conType=Oracle pdi.access=JNDI pdi.hostName=10.6.12.333 pdi.dataBaseName=pdires pdi.portNumber=1521 pdi.username=pdires pdi.password=pdires pdi.plugins=D:\\data-integration\\plugins pdi.pluginsKaraf=D:\\data-integration\\system\\karaf\\system\\pentaho
然后创建对象Object obj = RepBuilder.getInstance();
如果obj!=null 则表明连接成功
第二部分,运行job或者转换
/** * @author WGQ * 根据指定的资源库对象及job名称 运行指定的job * @param filename * @param parameters * @param dirName * @return * @throws KettleException * */ public static Result runJob(KettleDatabaseRepository rep,String jobName, String[] parameters, String filename, String dirName) throws Exception{ RepositoryDirectoryInterface dir = rep.findDirectory(dirName);//根据指定的字符串路径 找到目录 //加载指定的job JobMeta jobMeta = rep.loadJob(rep.getJobId(jobName, dir), null); Job job = new Job(rep, jobMeta); //设置参数 if(parameters!=null){ for(String param : parameters){ String key = param.split("=")[0]; String value = param.substring(param.indexOf("=")+1); jobMeta.setParameterValue(key,value); } } if(!"".equals(filename)){ PropertiesUtil p = new PropertiesUtil("sysConfig.properties"); jobMeta.setParameterValue("filename", p.readProperty("webUploadpath")+filename); } job.setLogLevel(LogLevel.BASIC); //启动执行指定的job job.run(); Result result = job.getResult(); job.waitUntilFinished();//等待job执行完; job.setFinished(true); System.out.println(job.getResult()); return result; } /** * @author WGQ * * @param rep 资源库对象 * @param transName 要调用的trans名称 * * 调用资源库中的trans * @param filename * @param parameters * @param dirName * @return * @throws KettleException * * */ public static Result runTrans(KettleDatabaseRepository rep,String transName, String[] parameters, String filename, String dirName) throws KettleException{ RepositoryDirectoryInterface dir = rep.findDirectory(dirName);//根据指定的字符串路径 找到目录 TransMeta tmeta = rep.loadTransformation(rep.getTransformationID(transName, dir), null); //设置参数 //tmeta.setParameterValue("", ""); if(parameters!=null){ for(String param : parameters){ String key = param.split("=")[0]; String value = param.substring(param.indexOf("=")+1); tmeta.setParameterValue(key,value); } } if(!"".equals(filename)){ PropertiesUtil p = new PropertiesUtil("sysConfig.properties"); tmeta.setParameterValue("filename", p.readProperty("webUploadpath")+filename); } Trans trans = new Trans(tmeta); trans.setLogLevel(LogLevel.BASIC); trans.execute(null);//执行trans trans.waitUntilFinished(); if(trans.getErrors()>0){ System.out.println("有异常"); } Result result = trans.getResult(); return result; }
这里讲解下参数jobName 和tranName 是job名称和转换名称,fileName 上传的文件的全路径,parameters 参数,dirName 是job或者转换所在的根目录