import java.net.InetAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import com.cloudera.flume.handlers.thrift.Priority;
import com.cloudera.flume.handlers.thrift.ThriftFlumeEvent;
import com.cloudera.flume.handlers.thrift.ThriftFlumeEventServer.Client;
public class FlumeLogAppender extends AppenderSkeleton {
private String hostname;
private String host;
private int port;
private Client client;
private TTransport transport;
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getHostname() {
return hostname;
}
public void setHostname(String hostname) {
this.hostname = hostname;
}
public void configure() {
try {
if (hostname == null) {
try {
hostname = InetAddress.getLocalHost().getCanonicalHostName();
} catch (UnknownHostException e) {
// can't get hostname
}
}
// Thrift boilerplate code
transport = new TSocket(new Socket(host, port));
TBinaryProtocol protocol = new TBinaryProtocol(transport);
client = new Client(protocol);
} catch (Exception e) {
System.out.println(e);
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
connect();
}
}
@Override
public void append(LoggingEvent loggingEvent) {
connect();
ThriftFlumeEvent tfe = new ThriftFlumeEvent();
try {
String message = layout.format(loggingEvent);
Map fields = new HashMap();
tfe.fields = fields;
tfe.priority = Priority.INFO;
tfe.timestamp = loggingEvent.getTimeStamp();
tfe.host = hostname;
tfe.body = ByteBuffer.wrap(message.getBytes());
} catch (Exception e) {
System.out.println(e);
System.out.println(loggingEvent);
}
while (true) {
try {
client.append(tfe);
break;
} catch (TTransportException e) {
System.out.println(e);
transport.close();
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
connect();
} catch (Exception e) {
System.out.println(e);
}
}
}
public void connect() {
if (transport != null && transport.isOpen())
return;
if (transport != null && transport.isOpen() == false) {
transport.close();
}
configure();
}
public void close() {
if (transport != null && transport.isOpen()) {
transport.close();
}
}
public boolean requiresLayout() {
return true;
}
}
Thursday, March 08, 2012
Flume Log4j Appender
Using Thrift 0.4.0, Flume 0.9.3-cdh3u0
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment