前言
在数据同步见过sqoop,datax,hdata,filesync,这四个工具。分析内部的实现逻辑各有巧思,也收获良多。 Sqoop1和sqoop2 底层数据同步的基本原理是一样的,所以我选择了sqoop1的源码作为分析的切入点。datax和hdata在架构上大同小异,hdata在数据传输借鉴了Disruptor一些内部实现,从Disruptor官网文档介绍让人对的原理及其感兴趣。好东西留待以后慢慢研究,所以选择datax更为一般的实现。filesync是公司内部开发的一个文件同步工具,用于远程文件和hdfs的文件同步
Sqoop
导入
sqoop从数据库同步到hdfs有俩种方式,1,JDBC的连接。2,使用数据库提供的工具
我们以mysql为例子 同步过程分为三个步骤:1,对数据分片;2,读取数据;3,写入数据JDBC
对数据分片采用通用方式,用count聚合函数获取需要同步的数据量nums,获取设置的map数m,nums/m就是每个map需要同步的数据量,见下面代码:
//1,对数据进行分片 @Override public ListgetSplits(JobContext job) throws IOException {... statement = connection.createStatement(); //获取nums results = statement.executeQuery(getCountQuery()); long count = results.getLong(1); int chunks = ConfigurationHelper.getJobNumMaps(job); long chunkSize = (count / chunks);... for (int i = 0; i < chunks; i++) { DBInputSplit split; if ((i + 1) == chunks) { split = new DBInputSplit(i * chunkSize, count); } else { split = new DBInputSplit(i * chunkSize, (i * chunkSize) + chunkSize); } splits.add(split); }... } } protected String getCountQuery() { if (dbConf.getInputCountQuery() != null) { return dbConf.getInputCountQuery(); } StringBuilder query = new StringBuilder(); query.append("SELECT COUNT(*) FROM " + tableName); if (conditions != null && conditions.length() > 0) { query.append(" WHERE " + conditions); } return query.toString(); }//2,读取数据库中的数据// 根据构造函数创建select 语句,sqoop中分为三种oracle,db2,通用 protected String getSelectQuery() { StringBuilder query = new StringBuilder(); // Default codepath for MySQL, HSQLDB, etc. // Relies on LIMIT/OFFSET for splits. if (dbConf.getInputQuery() == null) { query.append("SELECT "); for (int i = 0; i < fieldNames.length; i++) { query.append(fieldNames[i]); if (i != fieldNames.length -1) { query.append(", "); } } query.append(" FROM ").append(tableName); query.append(" AS ").append(tableName); //in hsqldb this is necessary if (conditions != null && conditions.length() > 0) { query.append(" WHERE (").append(conditions).append(")"); } String orderBy = dbConf.getInputOrderBy(); if (orderBy != null && orderBy.length() > 0) { query.append(" ORDER BY ").append(orderBy); } } else { //PREBUILT QUERY query.append(dbConf.getInputQuery()); } try { query.append(" LIMIT ").append(split.getLength()); query.append(" OFFSET ").append(split.getStart()); } catch (IOException ex) { // Ignore, will not throw. } return query.toString(); }// 3,写入hdfs,采用一般的context.write
从读取数据库中够着的语句看得出作者是有一番琢磨的,但是个人对构造的数据库语句的执行性能表示不大满意,也可能是出于通用写法,也可能是作者对数据不太了解,eg:在oracle中可以加入hint采用直接路径读方式,效率可以提升一个量级。
数据库客户端工具
采用mysql提供的客户端工具sqldump,要使用的前提是集群的机子有安装mysql 客户端。
// 分片获取的上下限的方式,如下: protected String getBoundingValsQuery() { // If the user has provided a query, use that instead. String userQuery = getDBConf().getInputBoundingQuery(); if (null != userQuery) { return userQuery; } // Auto-generate one based on the table name we've been provided with. StringBuilder query = new StringBuilder(); String splitCol = getDBConf().getInputOrderBy(); query.append("SELECT MIN(").append(splitCol).append("), "); query.append("MAX(").append(splitCol).append(") FROM "); query.append(getDBConf().getInputTableName()); String conditions = getDBConf().getInputConditions(); if (null != conditions) { query.append(" WHERE ( " + conditions + " )"); } return query.toString(); }//2,读取数据,这是最特别的实现,通过构造mysqldump语句导出数据 public void map(String splitConditions, NullWritable val, Context context) throws IOException, InterruptedException { LOG.info("Beginning mysqldump fast path import"); ArrayListargs = new ArrayList (); String tableName = conf.get(MySQLUtils.TABLE_NAME_KEY); // We need to parse the connect string URI to determine the database name. // Using java.net.URL directly on the connect string will fail because // Java doesn't respect arbitrary JDBC-based schemes. So we chop off the // scheme (everything before '://') and replace it with 'http', which we // know will work. String connectString = conf.get(MySQLUtils.CONNECT_STRING_KEY); String databaseName = JdbcUrl.getDatabaseName(connectString); String hostname = JdbcUrl.getHostName(connectString); int port = JdbcUrl.getPort(connectString); if (null == databaseName) { throw new IOException("Could not determine database name"); } LOG.info("Performing import of table " + tableName + " from database " + databaseName); args.add(MySQLUtils.MYSQL_DUMP_CMD); // requires that this is on the path. String password = DBConfiguration.getPassword((JobConf) conf); String passwordFile = null; Process p = null; AsyncSink sink = null; AsyncSink errSink = null; PerfCounters counters = new PerfCounters(); try { // --defaults-file must be the first argument. if (null != password && password.length() > 0) { passwordFile = MySQLUtils.writePasswordFile(conf); args.add("--defaults-file=" + passwordFile); } // Don't use the --where=" " version because spaces in it can // confuse Java, and adding in surrounding quotes confuses Java as well. String whereClause = conf.get(MySQLUtils.WHERE_CLAUSE_KEY, "(1=1)") + " AND (" + splitConditions + ")"; args.add("-w"); args.add(whereClause); args.add("--host=" + hostname); if (-1 != port) { args.add("--port=" + Integer.toString(port)); } args.add("--skip-opt"); args.add("--compact"); args.add("--no-create-db"); args.add("--no-create-info"); args.add("--quick"); // no buffering args.add("--single-transaction"); String username = conf.get(MySQLUtils.USERNAME_KEY); if (null != username) { args.add("--user=" + username); } // If the user supplied extra args, add them here. String [] extra = conf.getStrings(MySQLUtils.EXTRA_ARGS_KEY); if (null != extra) { for (String arg : extra) { args.add(arg); } } args.add(databaseName); args.add(tableName);... // Actually start the mysqldump. p = Runtime.getRuntime().exec(args.toArray(new String[0])); ... }// 3,写入hdfs,这个也是值得说一说地方,作者将导出后的数据,切分成 A,B,C 这种格式(分隔符,可依据传入的参数指定)然后在统一推到hdfs。 private static class CopyingStreamThread extends ErrorableThread { public static final Log LOG = LogFactory.getLog( CopyingStreamThread.class.getName()); ... public void run() { BufferedReader r = null; ... r = new BufferedReader(new InputStreamReader(this.stream)); // Actually do the read/write transfer loop here. int preambleLen = -1; // set to this for "undefined" while (true) { String inLine = r.readLine(); if (null == inLine) { break; // EOF. } if (inLine.trim().length() == 0 || inLine.startsWith("--")) { continue; // comments and empty lines are ignored } // this line is of the form "INSERT .. VALUES ( actual value text // );" strip the leading preamble up to the '(' and the trailing // ');'. if (preambleLen == -1) { // we haven't determined how long the preamble is. It's constant // across all lines, so just figure this out once. String recordStartMark = "VALUES ("; preambleLen = inLine.indexOf(recordStartMark) + recordStartMark.length(); } // chop off the leading and trailing text as we write the // output to HDFS. int len = inLine.length() - 2 - preambleLen; context.write(inLine.substring(preambleLen, inLine.length() - 2) + "\n", null); counters.addBytes(1 + len); } ... } } }
导出
sqoop 还是提供俩种方式 1,jdbc;2,客户端导出
JDBC
使用jdbc导出,使用客户端的insert语句,批量插入。比较一般,具体可见jdbc批量插入相关章节
使用客户端工具
mapreduce 先将hdfs数据写入到本地文件路径,按表名命名的文件,mysqlimport 读取本地文件将输入导出到mysql
private void initMySQLImportProcess() throws IOException { File taskAttemptDir = TaskId.getLocalWorkPath(conf); this.fifoFile = new File(taskAttemptDir, conf.get(MySQLUtils.TABLE_NAME_KEY, "UNKNOWN_TABLE") + ".txt"); String filename = fifoFile.toString(); // Create the FIFO itself. try { new NamedFifo(this.fifoFile).create(); } catch (IOException ioe) { // Command failed. LOG.error("Could not mknod " + filename); this.fifoFile = null; throw new IOException( "Could not create FIFO to interface with mysqlimport", ioe); } // Now open the connection to mysqlimport. ArrayListargs = new ArrayList (); String connectString = conf.get(MySQLUtils.CONNECT_STRING_KEY); String databaseName = JdbcUrl.getDatabaseName(connectString); String hostname = JdbcUrl.getHostName(connectString); int port = JdbcUrl.getPort(connectString); if (null == databaseName) { throw new IOException("Could not determine database name"); } args.add(MySQLUtils.MYSQL_IMPORT_CMD); // needs to be on the path. String password = DBConfiguration.getPassword((JobConf) conf); if (null != password && password.length() > 0) { passwordFile = new File(MySQLUtils.writePasswordFile(conf)); args.add("--defaults-file=" + passwordFile); } String username = conf.get(MySQLUtils.USERNAME_KEY); if (null != username) { args.add("--user=" + username); } args.add("--host=" + hostname); if (-1 != port) { args.add("--port=" + Integer.toString(port)); } args.add("--compress"); args.add("--local"); args.add("--silent"); // Specify the subset of columns we're importing. DBConfiguration dbConf = new DBConfiguration(conf); String [] cols = dbConf.getInputFieldNames(); if (null != cols) { StringBuilder sb = new StringBuilder(); boolean first = true; for (String col : cols) { if (!first) { sb.append(","); } sb.append(col); first = false; } args.add("--columns=" + sb.toString()); } // Specify the delimiters to use. int outputFieldDelim = conf.getInt(MySQLUtils.OUTPUT_FIELD_DELIM_KEY, (int) ','); int outputRecordDelim = conf.getInt(MySQLUtils.OUTPUT_RECORD_DELIM_KEY, (int) '\n'); int enclosedBy = conf.getInt(MySQLUtils.OUTPUT_ENCLOSED_BY_KEY, 0); int escapedBy = conf.getInt(MySQLUtils.OUTPUT_ESCAPED_BY_KEY, 0); boolean encloseRequired = conf.getBoolean( MySQLUtils.OUTPUT_ENCLOSE_REQUIRED_KEY, false); args.add("--fields-terminated-by=0x" + Integer.toString(outputFieldDelim, 16)); args.add("--lines-terminated-by=0x" + Integer.toString(outputRecordDelim, 16)); if (0 != enclosedBy) { if (encloseRequired) { args.add("--fields-enclosed-by=0x" + Integer.toString(enclosedBy, 16)); } else { args.add("--fields-optionally-enclosed-by=0x" + Integer.toString(enclosedBy, 16)); } } if (0 != escapedBy) { args.add("--escaped-by=0x" + Integer.toString(escapedBy, 16)); } // These two arguments are positional and must be last. args.add(databaseName); args.add(filename);... // Actually start mysqlimport. mysqlImportProcess = Runtime.getRuntime().exec(args.toArray(new String[0]));... }
datax
这是阿里开源了一个单机版本的源码,想要达成的一个目标,所有数据的交换都只要datax提供一种通用的接口就可以,用起来简单,不需要开发人员在学习mysql,oracle,mapreduce等的代码编写,想法很棒。结构如下:
数据的传输全部依赖内存,实现基本原理类似flume,memorychanne。俩者都有数据丢失可能,在异常情况下。有兴趣的同学可以去看看flume的源码。回到导入导出的话题。 在导入和导出,datax提供了一个统一的模型采用jdbc方式去链接。public void startRead(Configuration readerSliceConfig, RecordSender recordSender, TaskPluginCollector taskPluginCollector, int fetchSize) { String querySql = readerSliceConfig.getString(Key.QUERY_SQL); String table = readerSliceConfig.getString(Key.TABLE); PerfTrace.getInstance().addTaskDetails(taskId, table + "," + basicMsg); LOG.info("Begin to read record by Sql: [{}\n] {}.", querySql, basicMsg); PerfRecord queryPerfRecord = new PerfRecord(taskGroupId,taskId, PerfRecord.PHASE.SQL_QUERY); queryPerfRecord.start(); Connection conn = DBUtil.getConnection(this.dataBaseType, jdbcUrl, username, password); // session config .etc related DBUtil.dealWithSessionConfig(conn, readerSliceConfig, this.dataBaseType, basicMsg); int columnNumber = 0; ResultSet rs = null; try { rs = DBUtil.query(conn, querySql, fetchSize); queryPerfRecord.end(); ResultSetMetaData metaData = rs.getMetaData(); columnNumber = metaData.getColumnCount(); //这个统计干净的result_Next时间 PerfRecord allResultPerfRecord = new PerfRecord(taskGroupId, taskId, PerfRecord.PHASE.RESULT_NEXT_ALL); allResultPerfRecord.start(); long rsNextUsedTime = 0; long lastTime = System.nanoTime(); while (rs.next()) { rsNextUsedTime += (System.nanoTime() - lastTime); this.transportOneRecord(recordSender, rs, metaData, columnNumber, mandatoryEncoding, taskPluginCollector); lastTime = System.nanoTime(); } allResultPerfRecord.end(rsNextUsedTime); //目前大盘是依赖这个打印,而之前这个Finish read record是包含了sql查询和result next的全部时间 LOG.info("Finished read record by Sql: [{}\n] {}.", querySql, basicMsg); }catch (Exception e) { throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, username); } finally { DBUtil.closeDBResources(null, conn); } }
总结
sqoop和datax都采用插件的方式方便用户进行开发新的读写插件,但是基于现有的代码来看,sqoop的direct模式的性能会比datax更高,而且sqoop是站在大象的肩膀上,稳定性会比单机版的datax来得高
扩展
- 构建分布式集群的datax构思, 突发奇想,如果datax不是单机版的我会考虑使用她?简单构思了一下,画了一个草图,来和大家进行讨论讨论,基本流程:每个node上都部署了datax的服务(需要封装一个在线服务用于启动datax),定时将node状态信息(状态是否存活,内存,cpu负载,等信息)上报到consul配置中心。配置中心的配置由资源管理模块统一进行管理,当作业提交一个导出导出的请求,先计算分片的数量,以及每个分片需要使用的资源,然后向资源管理模块申请资源,作业管理根据申请到的资源,将启动作业发送到服务注册中心,去启动作业,当作业发生异常情况,反馈给作业管理,进入下一个的待调度列表。
- java调用shell,命令需要考虑到俩个东西,输入流和异常流。后者是最容易忽略的地方,直接关系着程序的健壮性,sqoop客户端导入导出给我们提供了一个标准的例子。
- 对数据拆分,然后进行并行操作,这在数据处理领域是一个比较常见的事,共享一个比较经典的例子。
import java.io.Serializable;import java.util.ArrayList;import java.util.List;import java.util.concurrent.*;public class AccumulateNum implements Serializable { public final static ListLST_NUM=new ArrayList(); static { for(int i=1;i<=100000;i++){ LST_NUM.add(i); } } public static class AccumulateTask extends RecursiveTask { //fork的粒度 private final static int THRESHOLD=1000; private int start; private int end; public AccumulateTask(int start,int end){ this.start=start; this.end=end; } @Override public Long compute() { long sum=0; if(end<=start)return sum; if(end-start future =pool.submit(new AccumulateTask(0,LST_NUM.size()-1)); Long endTime=System.nanoTime(); System.out.println(future.get()); System.out.println(endTime-startTime+"ns"); pool.shutdown(); pool.awaitTermination(1, TimeUnit.SECONDS); }}