0%

export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:~/lib/mysql-connector-java-5.1.28-bin.jar

本地运行,从mysql读取数据跑mapreduce

接口实现不仅仅要实现DBWritable,还应当实现Writable!!!**

DeptWritable.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

/**
* author:若泽数据-PK哥
* 交流群:545916944
*/
public class DeptWritable implements Writable, DBWritable {
private int deptid;
private int deptno;
private String dname;
private String loc;

public DeptWritable(){}

public DeptWritable(int deptid, int deptno,String dname,String loc){
this.deptid = deptid;
this.deptno = deptno;
this.dname = dname;
this.loc = loc;
}

public int getDeptid() {
return deptid;
}

public void setDeptid(int deptid) {
this.deptid = deptid;
}

public int getDeptno() {
return deptno;
}

public void setDeptno(int deptno) {
this.deptno = deptno;
}

public String getDname() {
return dname;
}

public void setDname(String dname) {
this.dname = dname;
}

public String getLoc() {
return loc;
}

public void setLoc(String loc) {
this.loc = loc;
}

@Override
public void write(PreparedStatement statement) throws SQLException {
statement.setInt(1,deptid);
statement.setInt(2,deptno);
statement.setString(3,dname);
statement.setString(4,loc);
}

@Override
public void readFields(ResultSet resultSet) throws SQLException {
deptid = resultSet.getInt(1);
deptno = resultSet.getInt(2);
dname = resultSet.getString(3);
loc = resultSet.getString(4);
}

@Override
public void write(DataOutput out) throws IOException {
out.writeInt(deptid);
out.writeInt(deptno);
out.writeUTF(dname);
out.writeUTF(loc);
}

@Override
public void readFields(DataInput in) throws IOException {
this.deptid = in.readInt();
this.deptno = in.readInt();
this.dname = in.readUTF();
this.loc = in.readUTF();
}

@Override
public String toString() {
return deptid + "\t"
+ "\t" + deptno
+ "\t" + dname
+ "\t" + loc;
}
}

MySQLReadDriver.java(该方案可以在本地运行,但打成瘦包后在服务器上不能运行)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import com.ruozedata.bigdata.hadoop.utils.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class MySQLReadDriver {

public static void main(String[] args) throws Exception {
String output = "out";

// 1)获取Job对象
Configuration configuration = new Configuration();
// configuration.set(DBConfiguration.DRIVER_CLASS_PROPERTY, "com.mysql.jdbc.Driver");
DBConfiguration.configureDB(configuration, "com.mysql.jdbc.Driver", "jdbc:mysql://hadoop000:3306/sqoop", "hadoop", "hadoop");

Job job = Job.getInstance(configuration);
FileUtils.deleteOutput(configuration, output);

// 2)本job对应要执行的主类是哪个
job.setJarByClass(MySQLReadDriver.class);

// 3)设置Mapper和Reducer
job.setMapperClass(MyMapper.class);

// 4)设置Mapper阶段输出数据的类型
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(DeptWritable.class);

// 6)设置输入和输出路径
String[] fields = {"id", "deptno", "dname", "loc"};
DBInputFormat.setInput(job, DeptWritable.class, "dept", null, null, fields);

FileOutputFormat.setOutputPath(job, new Path(output));

// 7)提交作业
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);

}

public static class MyMapper extends Mapper<LongWritable, DeptWritable, NullWritable, DeptWritable> {

@Override
protected void map(LongWritable key, DeptWritable value, Context context) throws IOException, InterruptedException {
context.write(NullWritable.get(), value);
}
}
}

MySQLReadDriverv2.java(该方案打成瘦包后可以在服务器上运行)

extends Configured implements Tool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import com.ruozedata.bigdata.hadoop.utils.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

public class MySQLReadDriverV2 extends Configured implements Tool{

public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
int run = ToolRunner.run(configuration, new MySQLReadDriverV2(), args);
System.exit(run);
}

@Override
public int run(String[] args) throws Exception {

String output = "out";

// 1)获取Job对象
Configuration configuration = super.getConf();
// configuration.set(DBConfiguration.DRIVER_CLASS_PROPERTY, "com.mysql.jdbc.Driver");
DBConfiguration.configureDB(configuration, "com.mysql.jdbc.Driver", "jdbc:mysql://hadoop000:3306/sqoop", "hadoop", "4WOzuishuai");

Job job = Job.getInstance(configuration);
FileUtils.deleteOutput(configuration, output);

// 2)本job对应要执行的主类是哪个
job.setJarByClass(MySQLReadDriverV2.class);

// 3)设置Mapper和Reducer
job.setMapperClass(MyMapper.class);

// 4)设置Mapper阶段输出数据的类型
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(DeptWritable.class);

// 6)设置输入和输出路径
String[] fields = {"id", "deptno", "dname", "loc"};
DBInputFormat.setInput(job, DeptWritable.class, "dept", null, null, fields);

FileOutputFormat.setOutputPath(job, new Path(output));

// 7)提交作业
boolean result = job.waitForCompletion(true);
return 1;
}

public static class MyMapper extends Mapper<LongWritable, DeptWritable, NullWritable, DeptWritable> {

@Override
protected void map(LongWritable key, DeptWritable value, Context context) throws IOException, InterruptedException {
context.write(NullWritable.get(), value);
}
}

}

本次操作在centos7.2版本的服务器上成功实现,本文不涉及rpm和源码安装2种方式,本文采用二进制安装

1.首先我们需要卸载内置的mariadb

原因是以前的Linux系统中数据库大部分是mysql,不过自从被sun收购之后,就没用集成在centos这些开源Linux系统中了,那么如果想用的话就需要自己安装了,首先centos7 已经不支持mysql,因为收费了你懂得,所以内部集成了mariadb,而安装mysql的话会和mariadb的文件冲突,所以需要先卸载掉mariadb

操作

1
2
# yum search mysql //检查是否有mariadb.x86_64
# yum remove mariadb-libs.x86_64

2.下载mysql

安装包:mysql-5.6.47-linux-glibc2.12-x86_64.tar.gz

1
2
3
4
5
6
// 下载
# wget https://dev.mysql.com/get/Downloads/MySQL-5.6/mysql-5.6.47-linux-glibc2.12-x86_64.tar.gz
// 解压
# tar -zxvf mysql-5.6.47-linux-glibc2.12-x86_64.tar.gz
// 复制解压后的mysql目录
# cp -r mysql-5.6.47-linux-glibc2.12-x86_64 /usr/local/mysql

3.添加用户组和用户

1
2
3
4
//添加用户组
# groupadd mysql
//添加用户mysql 到用户组mysql
# useradd -g mysql mysql

4.安装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# cd /usr/local/mysql/
# mkdir ./data/mysql
# chown -R mysql:mysql ./
# ./scripts/mysql_install_db --user=mysql --datadir=/usr/local/mysql/data/mysql
# cp support-files/mysql.server /etc/init.d/mysqld
# chmod 755 /etc/init.d/mysqld
# cp support-files/my-default.cnf /etc/my.cnf

//修改启动脚本
# vi /etc/init.d/mysqld

//修改项:
basedir=/usr/local/mysql/
datadir=/usr/local/mysql/data/mysql

//启动服务
# service mysqld start

//加入环境变量,编辑 /etc/profile,这样可以在任何地方用mysql命令了
# export PATH=$PATH:/usr/local/mysql//bin<br>source /etc/profile

//启动mysql
# service mysqld start
//关闭mysql
# service mysqld stop
//查看运行状态
# service mysqld status

可能出现的问题↓

问题1::FATAL ERROR: please install the following Perl modules before executing

问题原因:缺少autoconf库

解决方案:

1
yum -y install autoconf

问题2:Installing MySQL system tables…./bin/mysqld: error while loading shared libraries: libaio.so.1: cannot open shared object file: No such file or directory

问题原因:缺少libaio库文件

解决方案:

1
yum -y install libaio*

Access.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
* 自定义序列化类
*/

public class Access implements Writable {

private String phone;
private long up;
private long down;
private long sum;

@Override
public String toString() {
return phone + '\t' + up + '\t' + down + '\t' + sum;
}

public Access(){}

public Access(String phone, long up, long down){
this.phone = phone;
this.up = up;
this.down = down;
this.sum = up + down;
}

// 序列化方法
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(phone);
dataOutput.writeLong(up);
dataOutput.writeLong(down);
dataOutput.writeLong(sum);
}

// 反序列化方法
@Override
public void readFields(DataInput dataInput) throws IOException {
this.phone = dataInput.readUTF();
this.up = dataInput.readLong();
this.down = dataInput.readLong();
this.sum = dataInput.readLong();


}

public String getPhone() {
return phone;
}

public void setPhone(String phone) {
this.phone = phone;
}

public long getUp() {
return up;
}

public void setUp(long up) {
this.up = up;
}

public long getDown() {
return down;
}

public void setDown(long down) {
this.down = down;
}

public long getSum() {
return sum;
}

public void setSum(long sum) {
this.sum = sum;
}
}

SerDriver.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import com.ruozedata.bigdata.hadoop.utils.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
* 千篇一律
*/

public class SerDriver {

public static void main(String[] args) throws Exception{

// 1) 获取Job对象
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);

// 2) 本job对应要执行的主类是哪个
job.setJarByClass(SerDriver.class);

// 3) 设置Mapper和Reducer
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);

// 4) 设置Mapper阶段输出数据的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Access.class);


// 5) 设置Reducer阶段输出数据的类型
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Access.class);

// 6) 设置输入和输出路径
String input = "data/access.log";
String output = "out";


FileUtils.deleteOutput(configuration, output);

FileInputFormat.setInputPaths(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));

// 7) 提交作业
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);

}

public static class MyMapper extends Mapper<LongWritable, Text, Text, Access> {

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] splits = value.toString().split("\t");

// 手机号
String phone = splits[1]; //index是从0开始的

// 上行流量
long up = Long.parseLong(splits[splits.length - 3]);

// 下行流量
long down = Long.parseLong(splits[splits.length - 2]);

// 写出去
// Access access = new Access();
// access.setPhone(phone);
// access.setUp(up);
// access.setDown(down);
// access.setSum(up + down);

context.write(new Text(phone), new Access(phone, up, down));
}
}

public static class MyReducer extends Reducer<Text, Access, NullWritable, Access> {

@Override
protected void reduce(Text phone, Iterable<Access> values, Context context) throws IOException, InterruptedException {
long ups = 0;
long downs = 0;

// Iterator<Access> iterator = values.iterator();
// while(iterator.hasNext()){
// Access next = iterator.next();
// }

for(Access access : values){
ups += access.getUp();
downs += access.getDown();
}

context.write(NullWritable.get(), new Access(phone.toString(), ups, downs));
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.FsPermission;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.net.URI;

public class HDFSAPITest2 {
FileSystem fileSystem;

@Before
public void setUp() throws Exception{
URI uri = new URI("hdfs://hadoop:8020");
Configuration configuration = new Configuration();

configuration.set("dfs.client.use.datanode.hostname", "true");
configuration.set("dfs.replication", "1");
// 1) 获取HDFS客户端
fileSystem = FileSystem.get(uri, configuration, "root");

}

@After
public void shutdown() throws Exception{
// 3) 关闭资源
if(fileSystem != null){
fileSystem.close();
}
}

@Test
public void mkdir() throws Exception {
fileSystem.mkdirs(new Path("hdfs://hadoop:8020/hdfstest0"));
}

@Test
public void copyFromLocalFile() throws Exception{
Path src = new Path("data/ruozedata.txt");
Path dst = new Path("/hdfstest");
fileSystem.copyFromLocalFile(src, dst);
}

@Test
public void copyToLocalFile() throws Exception{
Path src = new Path("/20201011/146.txt");
Path dst = new Path("output/146.txt");
fileSystem.copyToLocalFile(src, dst);
}

@Test
public void rename() throws Exception{
Path src = new Path("/20201011/20201011-0.txt");
Path dst = new Path("/20201011/146.txt");
fileSystem.rename(src, dst);
}

@Test
public void listFiles() throws Exception{
RemoteIterator<LocatedFileStatus> files = fileSystem.listFiles(new Path("/20201011"), true);

while(files.hasNext()){
LocatedFileStatus fileStatus = files.next();
String path = fileStatus.getPath().toString();
long len = fileStatus.getLen();
short replication = fileStatus.getReplication();
FsPermission permission = fileStatus.getPermission();
String isDir = fileStatus.isDirectory()? "文件夹" : "文件";

System.out.println(path + "\t" + len + "\t" + replication + "\t" + permission + "\t" + isDir);

BlockLocation[] blockLocations = fileStatus.getBlockLocations();
for(BlockLocation location : blockLocations){
String[] hosts = location.getHosts();
for(String host : hosts){
System.out.println(host + "........");
}
}

}
}

@Test
public void delete() throws Exception{
fileSystem.delete(new Path("/hdfstest"), true);
}
}

官网描述:https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/SingleCluster.html

本文在官网描述的基础上做翻译和细节补充

hadoop部署

类似于前一篇章的jdk部署

1
2
3
4
5
6
7
8
9
10
11
tar -zxvf hadoop-2.6.0-cdh5.16.2.tar.gz -C ~/app/

hadoop软件包常见目录说明
bin: hadoop客户端命令
etc: hadoop相关的配置文件存放目录
sbin:启动hadoop相关进程的脚本
share:常用例子

配置.bash_profile
export HADOOP_HOME=/home/pearfl/app/hadoop-2.6.0-cdh5.16.2
export PATH=$HADOOP_HOME/bin:$PATH

修改相关的配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
hadoop-env.sh内需要配置JAVA_HOME
localhost:8020中的localhost需要修改

core-site.xml
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://hadoop000:8020</value>
</property>
</configuration>

hdfs-site.xml
其中hadoop.tmp.dir修改是因为机器每次重启时候会清空tmp目录
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/home/pearfl/app/tmp</value>
</property>
</configuration>

slaves
mxh(设置成你喜欢的就行了,注意需要与hostname一致)

启动HDFS:第一次执行的时候一定要格式化文件系统,不要重复执行

1
hdfs namenode -format

启动集群

1
2
3
4
5
6
$HADOOP_HOME/sbin/start-dfs.sh
验证:
#jps
8148 NameNode
8245 DataNode
8540 SecondaryNameNode