Flink开发-使用代码提交任务

前言

本文Flink使用版本1.12.7

Flink中应用的执行会涉及到三部分:Client,JobManager 和 TaskManagers。

  • Client 负责提交应用到集群
  • JobManager 负责应用执行期间一些必要的记录工作
  • TaskManager 负责具体的应用执行

代码提交任务

Application模式提交

准备文件夹和文件

1
2
3
hadoop fs -mkdir -p /jar/userTask
hadoop fs -mkdir -p /jar/flink12/libdist
hadoop fs -mkdir -p /jar/flink12/lib

拷贝需要的文件

1
2
3
hadoop fs -put $FLINK_HOME/examples/batch/WordCount.jar /jar/userTask/WordCount.jar
hadoop fs -put $FLINK_HOME/lib/flink-dist_2.12-1.12.7.jar /jar/flink12/libdist/flink-dist_2.12-1.12.7.jar
hadoop fs -put $FLINK_HOME/lib/* /jar/flink12/lib/

查看文件可以访问这个地址

http://hadoop01:50070/explorer.html#/

http://hadoop02:50070/explorer.html#/

在服务器上测试一下

1
flink run-application -t yarn-application hdfs://hdfsns/jar/userTask/WordCount.jar --output hdfs://hdfsns/bigdata_study/output03

添加依赖

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-yarn_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.30</version>
</dependency>

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
package cn.psvmc;

import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.*;
import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.yaml.snakeyaml.Yaml;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.net.MalformedURLException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.*;
import java.util.stream.Collectors;

public class StartupFlinkYarn {

/**
* Yarn主机名
*/
private static final String YARN_RESOURCE_MANAGER_HOST_NAME = "yarn.resourceManager.hostname.";

/**
* Yarn资源管理器地址
*/
private static final String YARN_RESOURCE_MANAGER_ADDRESS = "yarn.resourceManager.address.";

/**
* 过滤Hadoop配置文件的后缀
*/
private static final String CONFIG_XML_SUFFIX = ".xml";

/**
* Yarn客户端
*/
private static YarnClient yarnClient;

/**
* Yarn配置
*/
private static YarnConfiguration yarnConfig;

/**
* Flink配置
*/
private static Configuration flinkConfig;


/**
* 初始化配置文件
*
* @throws Exception 加载异常
*/
private static void initiation() throws Exception {
loadYarnConfig();
loadYarnClient();
loadFlinkConfig();
}

/**
* 加载Yarn的配置信息
*/
private static void loadYarnConfig() throws MalformedURLException {
yarnConfig = new YarnConfiguration();

//Hadoop的根目录(一般直接通过System.getEnv("HADOOP_HOME")获取)
String hadoopPath = "/data/tools/bigdata/hadoop-2.7.7";

//Hadoop配置文件的目录
hadoopPath = hadoopPath + "/etc/hadoop";

File file = new File(hadoopPath);

//如果不加载Hadoop的配置文件启动时会无限等待,没有结果
if (!file.exists()) {
System.out.println("Hadoop目录不存在:{}" + hadoopPath);

return;
}

if (!file.isDirectory()) {
System.out.println("Hadoop路径是一个文件而不是一个目录:{}" + hadoopPath);
return;
}

//过滤所有的文件,找出所有的xml配置文件
List<File> configList = Arrays.stream(Objects.requireNonNull(file.listFiles())).filter(file2 -> file2.getName().endsWith(CONFIG_XML_SUFFIX)).collect(Collectors.toList());

//遍历xml文件加入到Yarn的配置资源中
for (File fileItem : configList) {
System.out.println("Load Hadoop File:{}" + fileItem.getAbsolutePath());
yarnConfig.addResource(fileItem.toURI().toURL());
}

//遍历Yarn解析出来的配置信息
for (Map.Entry<String, String> entry : yarnConfig) {
String key = entry.getKey();

String value = entry.getValue();

//简单处理一下地址
if (key.startsWith(YARN_RESOURCE_MANAGER_HOST_NAME.toLowerCase())) {
String rm = key.substring(YARN_RESOURCE_MANAGER_HOST_NAME.length());
String addressKey = YARN_RESOURCE_MANAGER_ADDRESS.toLowerCase() + rm;
if (yarnConfig.get(addressKey) == null) {
yarnConfig.set(addressKey, value + ":" + YarnConfiguration.DEFAULT_RM_PORT);
}
}

//初始化HDFS文件系统,这里直接使用Hadoop配置的Hdfs的实现,方便初始化Hdfs的文件系统
//否则后面无法识别hdfs://协议
//也可以再Resource中放入Hadoop的配置文件core-site.xml,可以跳过此步
//也可以在yarnConfig中设置FileSystem.FS_DEFAULT_NAME_KEY
if (key.equals(FileSystem.FS_DEFAULT_NAME_KEY)) {
System.out.println("init HDFS FileSystem:{}" + value);
//初始化Hadoop的FileSystem
yarnConfig.set(FileSystem.FS_DEFAULT_NAME_KEY, value);
}
}
}

/**
* 加载Flink任务信息
*/
private static void loadFlinkConfig() throws IOException {
flinkConfig = new Configuration();

//Flink的目录(一般直接通过System.getEnv("FLINK_HOME")获取)
String flinkHome = "/data/tools/bigdata/flink-1.12.7";
//Flink目录下的lib
String flinkLib = flinkHome + "/lib";
//Flink目录下的conf
String flinkConf = flinkHome + "/conf";

//上传到Yarn要运行的Jar文件位置,这个Hdfs文件路径自行更改(这个jar是最终跑起来的Flink任务)
String targetJar = "hdfs://hdfsns/jar/userTask/WordCount.jar";

//运行你要跑的jar依赖的其他jar存放的Hdfs路径
String targetLib = "hdfs://hdfsns/jar/flink12/lib";

//这个Jar是Flink运行需要的jar,必须的
String flinkDistJar = "hdfs://hdfsns/jar/flink12/libdist/flink-dist_2.12-1.12.7.jar";

//读取Flink的配置文件
//主要是加载一些默认的配置,不加载会出现jobmanager.process.size....等配置找不到报错(也可以自己一个个加入到flinkConfig中)
String flinkConfigFile = flinkConf + "/flink-conf.yaml";

if (Files.exists(Paths.get(flinkConfigFile))) {
Yaml yaml = new Yaml();

FileInputStream fileInputStream = new FileInputStream(flinkConfigFile);

Map<String, Object> map = yaml.load(fileInputStream);

for (String key : map.keySet()) {
flinkConfig.setString(key, map.get(key).toString());
}

fileInputStream.close();
}

//设置Web的端口范围,可以不用设置
flinkConfig.setString(RestOptions.BIND_PORT, "20000-40000");


//获得Hadoop的文件系统
FileSystem fileSystem = FileSystem.get(yarnConfig);

//将本地的文件上传到HDFS,要上传其他文件请自己设定
// fileSystem.copyFromLocalFile(new Path("./WordCount.jar"), new Path(targetJar));
fileSystem.copyFromLocalFile(new Path(flinkLib + "/flink-dist_2.12-1.12.7.jar"), new Path(flinkDistJar));

//设置应用的显示名称(Yarn Web后台显示的名称)
flinkConfig.set(YarnConfigOptions.APPLICATION_NAME, "Test");

//设置为Application模式(固定)
flinkConfig.set(DeploymentOptions.TARGET, YarnDeploymentTarget.APPLICATION.getName());

//设置你要运行的jar(HDFS的路径)
flinkConfig.set(PipelineOptions.JARS, Collections.singletonList(targetJar));

//设置运行jar时依赖的包的目录
// flinkConfig.set(YarnConfigOptions.PROVIDED_LIB_DIRS, Collections.singletonList(new Path(targetLib).toString()));

//设置dist包(固定)
flinkConfig.set(YarnConfigOptions.FLINK_DIST_JAR, flinkDistJar);

//设置Class的加载方式[child-first/parent-first]
flinkConfig.setString(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first");

//设置日志的输出的配置(这里指向Flink的配置下的日志文件)
YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfig, flinkConf);
}

/**
* 加载Yarn客户端信息
*/
private static void loadYarnClient() {
yarnClient = YarnClient.createYarnClient();
yarnClient.init(yarnConfig);
yarnClient.start();
}

/**
* 创建Yarn集群的连接器
*
* @return
*/
private static YarnClusterDescriptor createYarnClusterDescriptor() {
return new YarnClusterDescriptor(
flinkConfig,
yarnConfig,
yarnClient,
YarnClientYarnClusterInformationRetriever.create(yarnClient),
false);
}

public static void main(String[] args) throws Exception {
//初始化配置信息
initiation();

//创建集群描述器
YarnClusterDescriptor descriptor = createYarnClusterDescriptor();

//设置启动应用的配置信息:第一个参数args是启动你的jar附带的参数,第二个参数是启动你的jar的main函数所在的类名完整名称
ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(args, null);

//配置任务的一些内存以及插槽
ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(512).setTaskManagerMemoryMB(1024)
.setSlotsPerTaskManager(1).createClusterSpecification();

//直接部署jar到Yarn上跑(提交任务并在Yarn上运行)
//每调用一次deployApplicationCluster就会多启动一个任务在Yarn的列表中
ClusterClientProvider<ApplicationId> provider = descriptor.deployApplicationCluster(clusterSpecification, applicationConfiguration);

//获得客户端
ClusterClient<ApplicationId> clusterClient = provider.getClusterClient();

//Yarn的Id
String applicationId = clusterClient.getClusterId().toString();

//访问Flink Web界面的地址
String webAddress = clusterClient.getWebInterfaceURL();

System.out.println("ApplicationId:{}" + applicationId);
System.out.println("WebInterface:{}" + webAddress);
}
}

查看yarn

http://hadoop02:8088/cluster

Yarn-Session模式提交

添加依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-yarn_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.deployment.StandaloneClusterId;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;

import java.io.File;
import java.util.concurrent.CompletableFuture;

public class FlinkClient {
public static void main(String[] args) {
String jarFilePath = "/data/tools/bigdata/flink-1.12.7/examples/batch/WordCount.jar";
RestClusterClient<StandaloneClusterId> client = null;
try {
// 集群信息
Configuration configuration = new Configuration();
configuration.setString(JobManagerOptions.ADDRESS, "hadoop02");
configuration.setInteger(JobManagerOptions.PORT, 6123);
configuration.setInteger(RestOptions.PORT, 8081);
client = new RestClusterClient<>(configuration, StandaloneClusterId.getInstance());
int parallelism = 1;
File jarFile=new File(jarFilePath);
SavepointRestoreSettings savepointRestoreSettings=SavepointRestoreSettings.none();
PackagedProgram program = PackagedProgram.newBuilder()
.setConfiguration(configuration)
//.setEntryPointClassName("org.apache.flink.table.examples.java.WordCountSQL")
.setJarFile(jarFile)
.setSavepointRestoreSettings(savepointRestoreSettings).build();
JobGraph jobGraph=PackagedProgramUtils.createJobGraph(program,configuration,parallelism,false);
CompletableFuture<JobID> result = client.submitJob(jobGraph);
JobID jobId= result.get();
System.out.println("jobId:"+ jobId.toString());
} catch (Exception e) {
e.printStackTrace();
}
}
}

调用脚本执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package cn.psvmc;

import com.jcraft.jsch.ChannelExec;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import com.sun.istack.logging.Logger;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

public class ConnectionSSH {
private static final Logger logger = Logger.getLogger(ConnectionSSH.class);
public static void main(String[] args) throws JSchException, IOException {
JSch jsch = new JSch();
String pubKeyPath = "C:\\Users\\Administrator\\.ssh\\id_rsa.pub";
jsch.addIdentity(pubKeyPath);
String username = "root";
String host = "192.168.7.101";
Session session =jsch.getSession(username, host, 22);//为了连接做准备
session.setConfig("StrictHostKeyChecking", "no");
session.connect();
String command = "flink run -t yarn-per-job $FLINK_HOME/examples/batch/WordCount.jar";
ChannelExec channel=(ChannelExec)session.openChannel("exec");
channel.setCommand(command);
BufferedReader in = new BufferedReader(new InputStreamReader(channel.getInputStream()));
channel.connect();

String msg;
while((msg = in.readLine()) != null){
System.out.println(msg);
}
channel.disconnect();
session.disconnect();
}
}

使用密码

1
2
3
4
5
6
7
JSch jsch = new JSch();
String username = "root";
String host = "192.168.7.101";
Session session =jsch.getSession(username, host, 22);//为了连接做准备
session.setConfig("StrictHostKeyChecking", "no");
session.setPassword("zhangjian");
session.connect();

使用密匙

1
2
3
4
5
6
7
8
JSch jsch = new JSch();
String pubKeyPath = "C:\\Users\\Administrator\\.ssh\\id_rsa";
jsch.addIdentity(pubKeyPath);
String username = "root";
String host = "192.168.7.101";
Session session =jsch.getSession(username, host, 22);//为了连接做准备
session.setConfig("StrictHostKeyChecking", "no");
session.connect();

调用脚本执行2

这个类除了可以运行脚本,还可以复制文件。

依赖:

1
2
3
4
5
<dependency>
<groupId>ch.ethz.ganymed</groupId>
<artifactId>ganymed-ssh2</artifactId>
<version>build210</version>
</dependency>

工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
package cn.psvmc;

import ch.ethz.ssh2.Connection;
import ch.ethz.ssh2.SCPClient;
import ch.ethz.ssh2.Session;
import ch.ethz.ssh2.StreamGobbler;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;

import java.io.*;
/**
* 描述:连接linux服务器并执行相关的shell命令
*/
public class ConnectLinuxCommand {
private static final Logger logger = Logger.getLogger(ConnectLinuxCommand.class);

private static final String DEFAULTCHARTSET = "UTF-8";
private static Connection conn;

/**
* @Title: login
* @Description: 用户名密码方式 远程登录linux服务器
* @return: Boolean
*/
public static Boolean login(RemoteConnect remoteConnect) {
boolean flag = false;
try {
conn = new Connection(remoteConnect.getIp());
conn.connect();// 连接
flag = conn.authenticateWithPassword(remoteConnect.getUserName(), remoteConnect.getPassword());// 认证
if (flag) {
logger.info("认证成功!");
} else {
logger.error("认证失败!");
conn.close();
}
} catch (IOException e) {
e.printStackTrace();
}
return flag;
}

public static Boolean loginWithoutPwd(RemoteConnect remoteConnect) {
boolean flag = true;
try {
conn = new Connection(remoteConnect.getIp());
conn.connect();// 连接
boolean authenticationPartialSuccess = conn.isAuthenticationPartialSuccess();
System.out.println("authenticationPartialSuccess = " + authenticationPartialSuccess);
logger.info("认证成功!");
} catch (IOException e) {
e.printStackTrace();
}
return flag;
}

/**
* @param remoteConnect 连接信息对象
* @param keyFile 一个文件对象指向一个文件,该文件包含OpenSSH**格式的用户的DSA或RSA私钥(PEM,不能丢失"-----BEGIN DSA PRIVATE KEY-----" or "-----BEGIN RSA PRIVATE KEY-----"标签
* @param keyfilePass 如果秘钥文件加密 需要用该参数解密,如果没有加密可以为null
* @return Boolean
* @Title: loginByKey
* @Description: 秘钥方式 远程登录linux服务器
*/
public static Boolean loginByFileKey(RemoteConnect remoteConnect, File keyFile, String keyfilePass) {
boolean flag = false;
try {
conn = new Connection(remoteConnect.getIp());
conn.connect();
// 登录认证
flag = conn.authenticateWithPublicKey(remoteConnect.getUserName(), keyFile, keyfilePass);
if (flag) {
logger.info("认证成功!");
} else {
logger.error("认证失败!");
conn.close();
}
} catch (Exception e) {
e.printStackTrace();
}
return flag;
}

/**
* @param remoteConnect 连接信息对象
* @param keys 一个字符[],其中包含用户的DSA或RSA私钥(OpenSSH密匙格式,您不能丢失“----- begin DSA私钥-----”或“-----BEGIN RSA PRIVATE KEY-----“标签。char数组可以包含换行符/换行符。
* @param keyPass 如果秘钥字符数组加密 需要用该字段解密 否则不需要可以为null
* @return Boolean
* @Title: loginByCharsKey
* @Description: 秘钥方式 远程登录linux服务器
*/
public static Boolean loginByCharsKey(RemoteConnect remoteConnect, char[] keys, String keyPass) {
boolean flag = false;
try {
conn = new Connection(remoteConnect.getIp());
conn.connect();
// 登录认证
flag = conn.authenticateWithPublicKey(remoteConnect.getUserName(), keys, keyPass);
if (flag) {
logger.info("认证成功!");
} else {
logger.error("认证失败!");
conn.close();
}
} catch (Exception e) {
e.printStackTrace();
}
return flag;
}

/**
* @param cmd 脚本命令
* @Title: execute
* @Description: 远程执行shll脚本或者命令
* @return: result 命令执行完毕返回结果
*/
public static String runCmd(String cmd) {
String result = "";
try {
Session session = conn.openSession();// 打开一个会话
session.execCommand(cmd);// 执行命令
result = processStdout(session.getStdout(), DEFAULTCHARTSET);
// 如果为得到标准输出为空,说明脚本执行出错了
if (StringUtils.isBlank(result)) {
result = processStdout(session.getStderr(), DEFAULTCHARTSET);
}
conn.close();
session.close();
} catch (IOException e) {
e.printStackTrace();
}
return result;
}

/**
* @return String 命令执行成功后返回的结果值,如果命令执行失败,返回空字符串,不是null
* @Title: executeSuccess
* @Description: 远程执行shell脚本或者命令
*/
public static String runCmdSuccess(String cmd) {
String result = "";
try {
Session session = conn.openSession();// 打开一个会话
session.execCommand(cmd);// 执行命令
result = processStdout(session.getStdout(), DEFAULTCHARTSET);
conn.close();
session.close();
} catch (IOException e) {
e.printStackTrace();
}
return result;
}

/**
* @param in 输入流对象
* @param charset 编码
* @return String 以纯文本的格式返回
* @Title: processStdout
* @Description: 解析脚本执行的返回结果
*/
public static String processStdout(InputStream in, String charset) {
InputStream stdout = new StreamGobbler(in);
StringBuilder buffer = new StringBuilder();
try {
BufferedReader br = new BufferedReader(new InputStreamReader(stdout, charset));
String line = null;
while ((line = br.readLine()) != null) {
buffer.append(line).append("\n");
}
} catch (IOException e) {
e.printStackTrace();
}
return buffer.toString();
}

/**
* @return String
* @Description: 通过用户名和密码关联linux服务器
*/
public static String runCmd(String ip, String userName, String password, String commandStr) {
logger.info(
"ConnectLinuxCommand scpGet===" +
"ip:" + ip +
" userName:" + userName +
" commandStr:" + commandStr
);

String returnStr = "";
RemoteConnect remoteConnect = new RemoteConnect();
remoteConnect.setIp(ip);
remoteConnect.setUserName(userName);
remoteConnect.setPassword(password);
try {
if (login(remoteConnect)) {
returnStr = runCmd(commandStr);
System.out.println(returnStr);
}
} catch (Exception e) {
e.printStackTrace();
}
return returnStr;
}

public static boolean connectLinuxWithoutPwd(String ip, String userName, String commandStr) {
logger.info("ConnectLinuxCommand scpGet===" + "ip:" + ip + " userName:" + userName + " commandStr:"
+ commandStr);

String returnStr = "";
boolean result = true;
RemoteConnect remoteConnect = new RemoteConnect();
remoteConnect.setIp(ip);
remoteConnect.setUserName(userName);
try {
if (loginWithoutPwd(remoteConnect)) {
returnStr = runCmd(commandStr);
System.out.println(result);
}
} catch (Exception e) {
e.printStackTrace();
}
if (StringUtils.isBlank(returnStr)) {
result = false;
}
return result;
}

/**
* @param password 密码(其他服务器)
* @param remoteFile 文件位置(其他服务器)
* @param localDir 本服务器目录
* @Title: scpGet
* @Description: 从其他服务器获取文件到本服务器指定目录
*/
public static void scpPull(String ip, String userName, String password, String remoteFile, String localDir)
throws IOException {

logger.info("ConnectLinuxCommand scpGet===" + "ip:" + ip + " userName:" + userName + " remoteFile:"
+ remoteFile + " localDir:" + localDir);
RemoteConnect remoteConnect = new RemoteConnect();
remoteConnect.setIp(ip);
remoteConnect.setUserName(userName);
remoteConnect.setPassword(password);
if (login(remoteConnect)) {
SCPClient client = new SCPClient(conn);
client.get(remoteFile, localDir);
conn.close();
}
}


/**
* 将文件复制到其他计算机中
* @param ip 远程IP
* @param userName 远程用户名
* @param password 远程密码
* @param localFile 本地文件
* @param remoteDir 远程目录
* @throws IOException 异常
*/
public static void scpPush(String ip, String userName, String password, String localFile, String remoteDir)
throws IOException {
logger.info("ConnectLinuxCommand scpPut===" + "ip:" + ip + " userName:" + userName + " localFile:"
+ localFile + " remoteDir:" + remoteDir);
RemoteConnect remoteConnect = new RemoteConnect();
remoteConnect.setIp(ip);
remoteConnect.setUserName(userName);
remoteConnect.setPassword(password);
if (login(remoteConnect)) {
SCPClient client = new SCPClient(conn);
client.put(localFile, remoteDir);
conn.close();
}
}
}

RemoteConnect

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class RemoteConnect {
String ip;
String userName;
String password;

public String getIp() {
return ip;
}

public void setIp(String ip) {
this.ip = ip;
}

public String getUserName() {
return userName;
}

public void setUserName(String userName) {
this.userName = userName;
}

public String getPassword() {
return password;
}

public void setPassword(String password) {
this.password = password;
}
}

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package cn.psvmc;

public class CLCTest {
public static void main(String[] args) {
mTest1();
}

public static void mTest1() {
System.out.println("--------------------------------------");
String commandStr="flink run -t yarn-per-job $FLINK_HOME/examples/batch/WordCount.jar";
String result=ConnectLinuxCommand.runCmd("192.168.7.101","root","zhangjian",commandStr);
System.out.println("结果:"+result);
System.out.println("--------------------------------------");
}

public static void mTest2() {
try {
ConnectLinuxCommand.scpPull("192.168.7.101","root","zhangjian", "/root/test.txt", "d:/aa");
} catch (Exception e) {
e.printStackTrace();
}
}

public static void mTest3() {
try {
ConnectLinuxCommand.scpPush("192.168.7.101","root","zhangjian", "d:/aa/test2.txt", "/root/");
} catch (Exception e) {
e.printStackTrace();
}
}
}