DBInputFormat để chuyển dữ liệu từ SQL sang cơ sở dữ liệu NoSQL



Mục tiêu của blog này là tìm hiểu cách chuyển dữ liệu từ cơ sở dữ liệu SQL sang HDFS, cách chuyển dữ liệu từ cơ sở dữ liệu SQL sang cơ sở dữ liệu NoSQL.

Trong blog này, chúng ta sẽ khám phá các khả năng và khả năng của một trong những thành phần quan trọng nhất của công nghệ Hadoop, tức là MapReduce.

Ngày nay, các công ty đang sử dụng khuôn khổ Hadoop làm lựa chọn đầu tiên để lưu trữ dữ liệu vì khả năng xử lý dữ liệu lớn hiệu quả của nó. Nhưng chúng ta cũng biết rằng dữ liệu rất linh hoạt và tồn tại ở nhiều cấu trúc và định dạng khác nhau. Để kiểm soát một lượng lớn dữ liệu và các định dạng khác nhau của nó, cần phải có một cơ chế để phù hợp với tất cả các giống và tạo ra một kết quả hiệu quả và nhất quán.





Thành phần mạnh nhất trong khung công tác Hadoop là MapReduce có thể cung cấp khả năng kiểm soát dữ liệu và cấu trúc của nó tốt hơn so với các đối tác khác của nó. Mặc dù nó đòi hỏi chi phí cao của đường cong học tập và độ phức tạp của lập trình, nhưng nếu bạn có thể xử lý những sự phức tạp này, bạn chắc chắn có thể xử lý bất kỳ loại dữ liệu nào với Hadoop.

Khung công tác MapReduce chia tất cả các tác vụ xử lý của nó về cơ bản thành hai giai đoạn: Bản đồ và Giảm.



Chuẩn bị dữ liệu thô của bạn cho các giai đoạn này yêu cầu hiểu biết về một số lớp và giao diện cơ bản. Loại siêu cấp cho các quá trình tái xử lý này là Định dạng đầu vào.

Các Định dạng đầu vào lớp là một trong những lớp cốt lõi trong Hadoop MapReduce API. Lớp này chịu trách nhiệm xác định hai điều chính:

  • Phân chia dữ liệu
  • Trình đọc ghi

Phân chia dữ liệu là một khái niệm cơ bản trong khung Hadoop MapReduce xác định cả kích thước của các nhiệm vụ bản đồ riêng lẻ và máy chủ thực thi tiềm năng của nó. Các Trình đọc ghi chịu trách nhiệm đọc các bản ghi thực tế từ tệp đầu vào và gửi chúng (dưới dạng cặp khóa / giá trị) cho người lập bản đồ.



Số lượng người lập bản đồ được quyết định dựa trên số lượng tách. Công việc của InputFormat là tạo các phần tách. Hầu hết kích thước phân chia thời gian tương đương với kích thước khối nhưng không phải lúc nào các phân chia cũng được tạo dựa trên kích thước khối HDFS. Nó hoàn toàn phụ thuộc vào cách phương thức getSplits () của InputFormat của bạn đã được ghi đè.

Có một sự khác biệt cơ bản giữa phân chia MR và khối HDFS. Một khối là một phần dữ liệu vật lý trong khi phần tách chỉ là một phần hợp lý mà một người lập bản đồ đọc. Một phân tách không chứa dữ liệu đầu vào, nó chỉ chứa một tham chiếu hoặc địa chỉ của dữ liệu. Một sự phân chia về cơ bản có hai thứ: Độ dài tính bằng byte và một tập hợp các vị trí lưu trữ, chỉ là các chuỗi.

Để hiểu rõ hơn điều này, hãy lấy một ví dụ: Xử lý dữ liệu được lưu trữ trong MySQL của bạn bằng MR. Vì không có khái niệm về khối trong trường hợp này, nên lý thuyết: 'các phần tách luôn được tạo dựa trên khối HDFS',thất bại. Một khả năng là tạo các phân tách dựa trên phạm vi hàng trong bảng MySQL của bạn (và đây là những gì DBInputFormat thực hiện, một định dạng đầu vào để đọc dữ liệu từ cơ sở dữ liệu quan hệ). Chúng ta có thể có k số lần tách gồm n hàng.

Chỉ đối với InputFormats dựa trên FileInputFormat (một InputFormat để xử lý dữ liệu được lưu trữ trong các tệp) thì các phần tách được tạo dựa trên tổng kích thước, tính bằng byte, của các tệp đầu vào. Tuy nhiên, kích thước khối FileSystem của các tệp đầu vào được coi là giới hạn trên cho các phần tách đầu vào. Nếu bạn có một tệp nhỏ hơn kích thước khối HDFS, bạn sẽ chỉ nhận được 1 trình ánh xạ cho tệp đó. Nếu bạn muốn có một số hành vi khác nhau, bạn có thể sử dụng mapred.min.split.size. Nhưng nó lại chỉ phụ thuộc vào getSplits () của InputFormat của bạn.

Chúng tôi có rất nhiều định dạng đầu vào sẵn có trong gói org.apache.hadoop.mapreduce.lib.input.

CombineFileInputFormat.html

CombineFileRecordReader.html

CombineFileRecordReaderWrapper.html

CombineFileSplit.html

CombineSequenceFileInputFormat.html

CombineTextInputFormat.html

FileInputFormat.html

FileInputFormatCounter.html

Saltstack vs rối vs đầu bếp

FileSplit.html

FixedLengthInputFormat.html

InvalidInputException.html

KeyValueLineRecordReader.html

KeyValueTextInputFormat.html

MultipleInputs.html

NLineInputFormat.html

SequenceFileAsBinaryInputFormat.html

SequenceFileAsTextInputFormat.html

SequenceFileAsTextRecordReader.html

SequenceFileInputFilter.html

SequenceFileInputFormat.html

SequenceFileRecordReader.html

TextInputFormat.html

Mặc định là TextInputFormat.

Tương tự, chúng tôi có rất nhiều định dạng đầu ra đọc dữ liệu từ bộ giảm và lưu trữ nó vào HDFS:

FileOutputCommitter.html

FileOutputFormat.html

FileOutputFormatCounter.html

FilterOutputFormat.html

LazyOutputFormat.html

MapFileOutputFormat.html

MultipleOutputs.html

NullOutputFormat.html

PartialFileOutputCommitter.html

PartialOutputCommitter.html

SequenceFileAsBinaryOutputFormat.html

SequenceFileOutputFormat.html

TextOutputFormat.html

Mặc định là TextOutputFormat.

Khi bạn đọc xong blog này, bạn sẽ biết được:

  • Cách viết chương trình thu nhỏ bản đồ
  • Giới thiệu về các loại InputFormats khác nhau có sẵn trong Mapreduce
  • Sự cần thiết của InputFormats là gì
  • Cách viết InputFormats tùy chỉnh
  • Cách chuyển dữ liệu từ cơ sở dữ liệu SQL sang HDFS
  • Cách chuyển dữ liệu từ cơ sở dữ liệu SQL (tại đây MySQL) sang cơ sở dữ liệu NoSQL (tại đây Hbase)
  • Cách chuyển dữ liệu từ một cơ sở dữ liệu SQL sang bảng khác trong cơ sở dữ liệu SQL (Có lẽ điều này có thể không quan trọng lắm nếu chúng ta thực hiện việc này trong cùng một cơ sở dữ liệu SQL. Tuy nhiên, không có gì sai khi có cùng kiến ​​thức. Bạn chưa bao giờ biết làm thế nào nó có thể được sử dụng)

Điều kiện tiên quyết:

  • Hadoop được cài đặt sẵn
  • SQL được cài đặt sẵn
  • Hbase được cài đặt sẵn
  • Hiểu biết cơ bản về Java
  • MapReduce kiến ​​thức
  • Kiến thức cơ bản về khung Hadoop

Hãy hiểu tuyên bố vấn đề mà chúng ta sẽ giải quyết ở đây:

Chúng tôi có một bảng nhân viên trong MySQL DB trong cơ sở dữ liệu quan hệ Edureka của chúng tôi. Bây giờ theo yêu cầu kinh doanh, chúng tôi phải chuyển tất cả dữ liệu có sẵn trong DB quan hệ sang hệ thống tệp Hadoop, tức là HDFS, NoSQL DB được gọi là Hbase.

Chúng tôi có nhiều tùy chọn để thực hiện tác vụ này:

  • Sqoop
  • Flume
  • MapReduce

Bây giờ, bạn không muốn cài đặt và cấu hình bất kỳ công cụ nào khác cho thao tác này. Bạn chỉ còn một tùy chọn duy nhất là MapReduce khung xử lý của Hadoop. Khung công tác MapReduce sẽ cung cấp cho bạn toàn quyền kiểm soát dữ liệu trong khi chuyển. Bạn có thể thao tác các cột và đặt trực tiếp tại bất kỳ vị trí nào trong hai vị trí mục tiêu.

Ghi chú:

  • Chúng tôi cần tải xuống và đặt trình kết nối MySQL trong classpath của Hadoop để tìm nạp các bảng từ bảng MySQL. Để thực hiện việc này, hãy tải xuống trình kết nối com.mysql.jdbc_5.1.5.jar và giữ nó trong thư mục Hadoop_home / share / Hadoop / MaPreduce / lib.
cp Downloads / com.mysql.jdbc_5.1.5.jar $ HADOOP_HOME / share / hadoop / mapreduce / lib /
  • Ngoài ra, hãy đặt tất cả các lọ Hbase trong Hadoop classpath để chương trình MR của bạn truy cập Hbase. Để làm điều này, hãy thực hiện lệnh sau :
cp $ HBASE_HOME / lib / * $ HADOOP_HOME / share / hadoop / mapreduce / lib /

Các phiên bản phần mềm mà tôi đã sử dụng để thực hiện tác vụ này là:

  • Hadooop-2.3.0
  • HBase 0,98,9-Hadoop2
  • Nhật thực mặt trăng

Để tránh chương trình gặp bất kỳ vấn đề tương thích nào, tôi yêu cầu người đọc của tôi chạy lệnh với môi trường tương tự.

DBInput tùy chỉnh có thể ghi:

package com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .W ghi được nhập vào org.apache.hadoop.mapreduce.lib.db.DB Lớp công khai có thể ghi DBInputW Khả năng thực hiện có thể ghi được, ghi được dữ liệu {private int id private String name, dept public void readFields (DataInput in) ném IOException {} public void readFields (ResultSet rs) ném SQLException // Đối tượng tập kết quả đại diện cho dữ liệu được trả về từ câu lệnh SQL {id = rs.getInt (1) name = rs.getString (2) dept = rs.getString (3)} public void write (DataOutput out) ném IOException { } public void write (PreparedStatement ps) ném SQLException {ps.setInt (1, id) ps.setString (2, name) ps.setString (3, dept)} public int getId () {return id} public String getName () {return name} public String getDept () {return dept}}

DBOutputW ghi tùy chỉnh:

package com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .W ghi nhập khẩu org.apache.hadoop.mapreduce.lib.db.DB ghi lớp công khai DBOutputW Khả năng thực hiện ghi có thể ghi được, ghi được dữ liệu {private String name private int id private String dept public DBOutputW rán (String name, int id, String dept) {this.name = name this.id = id this.dept = dept} public void readFields (DataInput in) ném IOException {} public void readFields (ResultSet rs) ném SQLException {} public void write (DataOutput out) ném IOException {} public void write (PreparedStatement ps) ném SQLException {ps.setString (1, name) ps.setInt (2, id) ps.setString (3, dept)}}

Bảng đầu vào:

tạo cơ sở dữ liệu edureka
tạo bảng emp (empid int not null, name varchar (30), dept varchar (20), khóa chính (empid))
chèn vào các giá trị emp (1, 'abhay', 'developmentement'), (2, 'brundesh', 'test')
chọn * từ trống

Trường hợp 1: Chuyển từ MySQL sang HDFS

package com.inputFormat.copy import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce .Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop .io.Text import org.apache.hadoop.io.IntWosystem public class MainDbtohdfs {public static void main (String [] args) throws Exception {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc .Driver ', // driver class' jdbc: mysql: // localhost: 3306 / edureka ', // db url' root ', // tên người dùng' root ') // mật khẩu Job job = new Job (conf) job .setJarByClass (MainDbtohdfs.class) job.setMapperClass (Map.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWuality.class) job.setInputFormatormatClass (DBInputFormat.class) File new Path (args [0])) DBInputFormat.setInput (job, DBInputW rán.class, 'emp', // tên bảng nhập null, null, new String [] {'empid', 'name', 'dept'} / / table column) Path p = new Path (args [0]) FileSystem fs = FileSystem.get (new URI (args [0]), conf) fs.delete (p) System.exit (job.waitForCompletion (true)? 0: 1)}}

Đoạn mã này cho phép chúng ta chuẩn bị hoặc định cấu hình inputformat để truy cập vào SQL DB nguồn của chúng ta. Tham số bao gồm lớp trình điều khiển, URL có địa chỉ của cơ sở dữ liệu SQL, tên người dùng và mật khẩu của nó.

DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // driver class 'jdbc: mysql: // localhost: 3306 / edureka', // db url 'root', // tên người dùng 'root') //mật khẩu

Đoạn mã này cho phép chúng ta chuyển thông tin chi tiết của các bảng trong cơ sở dữ liệu và đặt nó trong đối tượng công việc. Các tham số tất nhiên bao gồm cá thể công việc, lớp có thể ghi tùy chỉnh phải triển khai giao diện DBWille, tên bảng nguồn, điều kiện nếu có rỗng, bất kỳ tham số sắp xếp nào khác rỗng, danh sách các cột bảng tương ứng.

DBInputFormat.setInput (job, DBInputWults.class, 'emp', // input table name null, null, new String [] {'empid', 'name', 'dept'} // cột trong bảng)

Người vẽ bản đồ

package com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.io.LongWorites import org.apache.hadoop.io.Text import org.apache.hadoop.io Bản đồ lớp công khai .IntW ghi mở rộng Bản đồ {
bản đồ void được bảo vệ (khóa ghi dài, giá trị DBInputW ghi, ngữ cảnh ctx) {try {String name = value.getName () IntWlike id = new IntW ... (value.getId ()) String dept = value.getDept ()
ctx.write (Văn bản mới (tên + '' + id + '' + dept), id)
} catch (IOException e) {e.printStackTrace ()} catch (InterruptException e) {e.printStackTrace ()}}}

Giảm tốc: Bộ giảm nhận dạng được sử dụng

Lệnh chạy:

hadoop jar dbhdfs.jar com.inputFormat.copy.MainDbtohdfs / dbtohdfs

Đầu ra: Bảng MySQL được chuyển sang HDFS

hadoop dfs -ls / dbtohdfs / *

Trường hợp 2: Chuyển từ một bảng trong MySQL sang bảng khác trong MySQL

tạo bảng đầu ra trong MySQL

tạo bảng nhân viên1 (tên varchar (20), id int, dept varchar (20))

package com.inputFormat.copy import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib .db.DBInputFormat import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWorites import org.apache.hadoop.io.Null Mainonetable_to_other_table {public static void main (String [] args) ném Exception {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // driver class 'jdbc: mysql: // localhost : 3306 / edureka ', // db url' root ', // user name' root ') // mật khẩu Job job = new Job (conf) job.setJarByClass (Mainonetable_to_other_table.class) job.setMapperClass (Map.class) job .setReducerClass (Reduce.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWained.class) job.setOutputKeyClass (DBOutputWuality.class) job.setOutputValueClass (Nul lWlike.class) job.setInputFormatClass (DBInputFormat.class) job.setOutputFormatClass (DBOutputFormat.class) DBInputFormat.setInput (job, DBInputW St.class, 'emp', // input table name null, null, new String [] {'empidring ',' name ',' dept '} // cột trong bảng) DBOutputFormat.setOutput (job,' worker1 ', // tên bảng xuất mới String [] {' name ',' id ',' dept '} // bảng cột) System.exit (job.waitForCompletion (true)? 0: 1)}}

Đoạn mã này cho phép chúng ta cấu hình tên bảng đầu ra trong SQL DB, các tham số lần lượt là phiên bản công việc, tên bảng đầu ra và tên cột đầu ra.

DBOutputFormat.setOutput (job, 'worker1', // tên bảng xuất mới String [] {'name', 'id', 'dept'} // cột của bảng)

Người vẽ bản đồ: Giống như Trường hợp 1

Hộp giảm tốc:

package com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Reducer import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWlike import org.apache.hadoop.io Lớp công khai. (dòng [0] .toString (), Integer.parseInt (dòng [1] .toString ()), dòng [2] .toString ()), NullWuality.get ())} catch (IOException e) {e.printStackTrace ()} catch (InterruptException e) {e.printStackTrace ()}}}

Lệnh chạy:

hadoop jar dbhdfs.jar com.inputFormat.copy.Mainonetable_to_other_table

Đầu ra: Dữ liệu được truyền từ Bảng EMP trong MySQL sang Bảng khác Nhân viên1 trong MySQL

Trường hợp 3: Chuyển từ Bảng trong MySQL sang Bảng NoSQL (Hbase)

Tạo bảng Hbase để chứa đầu ra từ bảng SQL:

tạo 'nhân viên', 'chính thức_info'

Hạng lái xe:

package Dbtohbase import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.client.HTableInterface import org.apache .hadoop.hbase.io.ImmutableBytesWorites import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil import org.apache.hadoop.io.Text public class MainDbToHbase {public static void main (String [] args) throws Exception {Configuration conf = HBaseConfiguration.create () HTableInterface mytable = new HTable (conf, 'emp') DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // driver class 'jdbc: mysql: // localhost: 3306 / edureka' , // url db 'root', // tên người dùng 'root') // mật khẩu Job job = new Job (conf, 'dbtohbase') job.setJarByClass (MainDbToHbase.class) job.s Ethereum class) DBInputFormat.setInput (job, DBInputWosystem.class, 'emp', // nhập tên bảng null, null, new String [] {'empid', 'name', 'dept'} // cột trong bảng) System.exit (job.waitForCompletion (true)? 0: 1)}}

Đoạn mã này cho phép bạn định cấu hình lớp khóa đầu ra mà trong trường hợp của hbase là ImmutableBytesWained

job.setMapOutputKeyClass (ImmutableBytesWorites.class) job.setMapOutputValueClass (Text.class)

Ở đây chúng tôi đang chuyển tên bảng hbase và bộ giảm tốc để hoạt động trên bảng.

TableMapReduceUtil.initTableReducerJob ('nhân viên', Reduce.class, công việc)

Người vẽ bản đồ:

package Dbtohbase import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.hbase.io.ImmutableBytesWorites import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.io .LongWlike import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntW ghi lớp công khai Bản đồ mở rộng Bản đồ mở rộng Bản đồ {private IntWlike one = new IntWlike (1) được bảo vệ void map (LongWlike id, DBInputW ghi giá trị, Context context) {try {String line = value.getName () String cd = value.getId () + '' String dept = value.getDept () context.write (new ImmutableBytesW ghi (Bytes.toBytes (cd)), new Text (line + ' '+ dept))} catch (IOException e) {e.printStackTrace ()} catch (InterruptException e) {e.printStackTrace ()}}}

Trong đoạn mã này, chúng ta đang lấy các giá trị từ các getters của lớp DBinputwording và sau đó chuyển chúng vào
ImmutableBytesW ghi được để chúng tiếp cận trình giảm thiểu ở dạng bytewriatble mà Hbase hiểu được.

String line = value.getName () String cd = value.getId () + '' String dept = value.getDept () context.write (new ImmutableBytesWorites (Bytes.toBytes (cd)), new Text (line + '' + dept ))

Hộp giảm tốc:

package Dbtohbase import java.io.IOException import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWorites import org.apache.hadoop.hbase.mapreduce.TableReducer import org.apache.hadoop .hbase.util.Bytes import org.apache.hadoop.io.Text public class Rút gọn mở rộng TableReducer {public void Reduce (ImmutableBytesWorites key, Iterable values, Context context) ném IOException, InterruptException {String [] gây ra = null // Giá trị vòng lặp for (Văn bản val: giá trị) {gây ra = val.toString (). split ('')} // Đặt vào HBase Đặt put = new Put (key.get ()) put.add (Bytes.toBytes ('official_info' ' ), Bytes.toBytes ('name'), Bytes.toBytes (gây ra [0])) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('Department'), Bytes.toBytes (nguyên nhân [1 ])) context.write (key, put)}}

Đoạn mã này cho phép chúng tôi quyết định chính xác hàng và cột mà chúng tôi sẽ lưu trữ các giá trị từ trình giảm. Ở đây chúng tôi đang lưu trữ từng trống trong hàng riêng biệt vì chúng tôi đã tạo trống dưới dạng khóa hàng sẽ là duy nhất. Trong mỗi hàng, chúng tôi đang lưu trữ thông tin chính thức của nhân viên trong họ cột “official_info” dưới các cột “tên” và “bộ phận” tương ứng.

Put put = new Put (key.get ()) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('name'), Bytes.toBytes (gây ra [0])) put.add (Bytes. toBytes ('official_info'), Bytes.toBytes ('Department'), Bytes.toBytes (nguyên nhân [1])) context.write (key, put)

Dữ liệu được truyền trong Hbase:

quét nhân viên

Như chúng ta thấy, chúng ta đã có thể hoàn thành nhiệm vụ di chuyển dữ liệu kinh doanh của mình từ SQL DB quan hệ sang NoSQL DB thành công.

Trong blog tiếp theo, chúng ta sẽ tìm hiểu cách viết và thực thi mã cho các định dạng đầu vào và đầu ra khác.

Tiếp tục đăng nhận xét, câu hỏi hoặc bất kỳ phản hồi nào của bạn. Tôi rất thích nghe từ bạn.

Có một câu hỏi cho chúng tôi? Vui lòng đề cập đến nó trong phần bình luận và chúng tôi sẽ liên hệ lại với bạn.

Bài viết liên quan: