flume_实现自定义MysqlSink,写入mysql表
2020-08-12 23:56阅读:
由于篇幅美观性,将flume实现mysql写入的自定义sink单独一篇文章。
引用sink2的配置
#####发送到本地文件#####
# 订阅Sink
agent2.sinks.k2.type = org.flume.sink.mysql.MysqlSink
agent2.sinks.k2.hostname = 10.19.138.232
agent2.sinks.k2.port = 3306
agent2.sinks.k2.databaseName = internet
agent2.sinks.k2.tableName = i_intelligentcustservice
agent2.sinks.k2.username = username
agent2.sinks.k2.password = password
agent2.sinks.k2.batchSize = 20
吐槽下,IDEA使用MAVEN开发简直不要太好用,
比之前的eclipse好用,应用第三方包,直接在pom.xml里配置,自动下载。
新建项目的话需要设置下mavent仓库的位置。
项目POM.XML文件,由于格式在博客里不显示,只能贴图片,见谅。
引用的包具体是,junit、log4j、flume-ng-sdk、flume-ng-core、flume-ng-configuration、mysql-connector-;
font-family: 宋体; font-size: 9pt;'>fastjson
javabean文件
package org.flume.sink.mysql;
public class IntelligentBean {
private String uid;
private String operchannelname;
private String msgcontent;
private String msgdate;
private String msgtime;
private String question;
private String answer;
private String entranceid;
public String getUid() {
return uid;
}
public String getOperchannelname() {
return operchannelname;
}
public String getMsgcontent() {
return msgcontent;
}
public String getMsgdate() {
return msgdate;
}
public String getMsgtime() {
return msgtime;
}
public String getQuestion() {
return question;
}
public String getAnswer() {
return answer;
}
public String getEntranceid() {
return entranceid;
}
public void setUid(String uid) {
this.uid = uid;
}
public void setOperchannelname(String
operchannelname) {
this.operchannelname =
operchannelname;
}
public void setMsgcontent(String msgcontent)
{
this.msgcontent = msgcontent;
}
public void setMsgdate(String msgdate) {
this.msgdate = msgdate;
}
public void setMsgtime(String msgtime) {
this.msgtime = msgtime;
}
public void setQuestion(String question) {
this.question = question;
}
public void setAnswer(String answer) {
this.answer = answer;
}
public void setEntranceid(String entranceid)
{
this.entranceid = entranceid;
}
}
主要的MysqlSink类内容,
package org.flume.sink.mysql;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import java.sql.*;
import java.util.List;
import java.util.Map;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MysqlSink extends AbstractSink implements Configurable
{
private Logger LOG =
LoggerFactory.getLogger(MysqlSink.class);
private String hostname;
private String port;
private String databaseName;
private String tableName;
private String username;
private String password;
private PreparedStatement preparedStatement;
private Connection conn;
private int batchSize; //每次提交的批次大小
public MysqlSink() {
LOG.info('MysqlSink
start...');
}
public void configure(Context context) {
hostname =
context.getString('hostname');
Preconditions.checkNotNull(hostname, 'hostname must be
set!!');
port =
context.getString('port');
Preconditions.checkNotNull(port,
'port must be set!!');
databaseName =
context.getString('databaseName');
Preconditions.checkNotNull(databaseName, 'databaseName must be
set!!');
tableName =
context.getString('tableName');
Preconditions.checkNotNull(tableName, 'tableName must be
set!!');
username =
context.getString('username');
Preconditions.checkNotNull(username, 'user must be set!!');
password =
context.getString('password');
Preconditions.checkNotNull(password, 'password must be
set!!');
batchSize =
context.getInteger('batchSize', 100);
Preconditions.checkNotNull(batchSize > 0, 'batchSize must be a
positive number!!');
}
@Override
public void start() {
super.start();
try {
//调用Class.forName()方法加载驱动程序
Class.forName('com.mysql.jdbc.Driver');
} catch (ClassNotFoundException e)
{
e.printStackTrace();
}
// url重连
String url = 'jdbc:mysql://' + hostname +
':' + port + '/' + databaseName + '?autoReconnect=true';
//调用DriverManager对象的getConnection()方法,获得一个Connection对象
try {
conn =
DriverManager.getConnection(url, username, password);
conn.setAutoCommit(false);
//创建一个Statement对象
preparedStatement =
conn.prepareStatement('insert into ' + tableName +
'
(uid,entranceid,msgcontent,answer,question,operchannelname,msgtime,msgdate,inserttime)
values (?,?,?,?,?,?,?,?,now())');
} catch (SQLException e) {
e.printStackTrace();
System.exit(1);
}
}
@Override
public void stop() {
super.stop();
if (preparedStatement != null)
{
try {
preparedStatement.close();
} catch
(SQLException e) {
e.printStackTrace();
}
}
if (conn != null) {
try {
conn.close();
} catch
(SQLException e) {
e.printStackTrace();
}
}
}
public Status process() throws
EventDeliveryException {
Status result = Status.READY;
Channel channel =
getChannel();
Transaction transaction =
channel.getTransaction();
Event event = null;
String content = '';
transaction.begin();
List lists =
Lists.newArrayList();
try {
for (int i = 0; i
< batchSize; i++) {
event = channel.take();
if
(event != null) {
content = new String(event.getBody());
System.out.println(content+'11111111111111111111111111111111');
IntelligentBean intel=new IntelligentBean();
if(!''.equals(content)){
JSONObject jsonContent =
JSONObject.parseObject(content);
intel.setUid(jsonContent.getString('uid'));
intel.setAnswer(jsonContent.getString('answer'));
intel.setEntranceid(jsonContent.getString('entranceId'));
intel.setMsgcontent(jsonContent.getString('msgContent'));
intel.setMsgdate(jsonContent.getString('msgDate'));