Thursday, March 08, 2012

Flume Log4j Appender

Using Thrift 0.4.0, Flume 0.9.3-cdh3u0

import java.net.InetAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;

import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;

import com.cloudera.flume.handlers.thrift.Priority;
import com.cloudera.flume.handlers.thrift.ThriftFlumeEvent;
import com.cloudera.flume.handlers.thrift.ThriftFlumeEventServer.Client;

public class FlumeLogAppender extends AppenderSkeleton {

 private String hostname;
 private String host;
 private int port;

 private Client client;
 private TTransport transport;

 public String getHost() {
  return host;
 }

 public void setHost(String host) {
  this.host = host;
 }

 public int getPort() {
  return port;
 }

 public void setPort(int port) {
  this.port = port;
 }

 public String getHostname() {
  return hostname;
 }

 public void setHostname(String hostname) {
  this.hostname = hostname;
 }

 public void configure() {
  try {
   if (hostname == null) {
    try {
     hostname = InetAddress.getLocalHost().getCanonicalHostName();
    } catch (UnknownHostException e) {
     // can't get hostname
    }
   }

   // Thrift boilerplate code
   transport = new TSocket(new Socket(host, port));
   TBinaryProtocol protocol = new TBinaryProtocol(transport);
   client = new Client(protocol);
  } catch (Exception e) {
   System.out.println(e);
   try {
    Thread.sleep(1000);
   } catch (InterruptedException e1) {
    Thread.currentThread().interrupt();
   }
   connect();
  }
 }

 @Override
 public void append(LoggingEvent loggingEvent) {
  connect();

  ThriftFlumeEvent tfe = new ThriftFlumeEvent();

  try {
   String message = layout.format(loggingEvent);
   Map fields = new HashMap();
   tfe.fields = fields;
   tfe.priority = Priority.INFO;
   tfe.timestamp = loggingEvent.getTimeStamp();
   tfe.host = hostname;
   tfe.body = ByteBuffer.wrap(message.getBytes());
  } catch (Exception e) {
   System.out.println(e);
   System.out.println(loggingEvent);
  }
  
  while (true) {
   try {
    client.append(tfe);
    break;
   } catch (TTransportException e) {
    System.out.println(e);
    transport.close();
    try {
     Thread.sleep(1000);
    } catch (InterruptedException e1) {
     Thread.currentThread().interrupt();
    }
    connect();
   } catch (Exception e) {
    System.out.println(e);
   }
  }

 }

 public void connect() {
  if (transport != null && transport.isOpen())
   return;

  if (transport != null && transport.isOpen() == false) {
   transport.close();

  }
  configure();
 }

 public void close() {
  if (transport != null && transport.isOpen()) {
   transport.close();
  }
 }

 public boolean requiresLayout() {
  return true;
 }
}

No comments: