Thursday, March 08, 2012

capistrano - App Deployment

Flume Log4j Appender

Using Thrift 0.4.0, Flume 0.9.3-cdh3u0

import java.net.InetAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;

import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;

import com.cloudera.flume.handlers.thrift.Priority;
import com.cloudera.flume.handlers.thrift.ThriftFlumeEvent;
import com.cloudera.flume.handlers.thrift.ThriftFlumeEventServer.Client;

public class FlumeLogAppender extends AppenderSkeleton {

 private String hostname;
 private String host;
 private int port;

 private Client client;
 private TTransport transport;

 public String getHost() {
  return host;
 }

 public void setHost(String host) {
  this.host = host;
 }

 public int getPort() {
  return port;
 }

 public void setPort(int port) {
  this.port = port;
 }

 public String getHostname() {
  return hostname;
 }

 public void setHostname(String hostname) {
  this.hostname = hostname;
 }

 public void configure() {
  try {
   if (hostname == null) {
    try {
     hostname = InetAddress.getLocalHost().getCanonicalHostName();
    } catch (UnknownHostException e) {
     // can't get hostname
    }
   }

   // Thrift boilerplate code
   transport = new TSocket(new Socket(host, port));
   TBinaryProtocol protocol = new TBinaryProtocol(transport);
   client = new Client(protocol);
  } catch (Exception e) {
   System.out.println(e);
   try {
    Thread.sleep(1000);
   } catch (InterruptedException e1) {
    Thread.currentThread().interrupt();
   }
   connect();
  }
 }

 @Override
 public void append(LoggingEvent loggingEvent) {
  connect();

  ThriftFlumeEvent tfe = new ThriftFlumeEvent();

  try {
   String message = layout.format(loggingEvent);
   Map fields = new HashMap();
   tfe.fields = fields;
   tfe.priority = Priority.INFO;
   tfe.timestamp = loggingEvent.getTimeStamp();
   tfe.host = hostname;
   tfe.body = ByteBuffer.wrap(message.getBytes());
  } catch (Exception e) {
   System.out.println(e);
   System.out.println(loggingEvent);
  }
  
  while (true) {
   try {
    client.append(tfe);
    break;
   } catch (TTransportException e) {
    System.out.println(e);
    transport.close();
    try {
     Thread.sleep(1000);
    } catch (InterruptedException e1) {
     Thread.currentThread().interrupt();
    }
    connect();
   } catch (Exception e) {
    System.out.println(e);
   }
  }

 }

 public void connect() {
  if (transport != null && transport.isOpen())
   return;

  if (transport != null && transport.isOpen() == false) {
   transport.close();

  }
  configure();
 }

 public void close() {
  if (transport != null && transport.isOpen()) {
   transport.close();
  }
 }

 public boolean requiresLayout() {
  return true;
 }
}

Node JS version for sending ThriftFlumeEvent to Flume

This is using Thrift 0.8.0, Node.js 0.6.10

var os = require('os');
var Thrift = require('thrift');
var FlumeClient = require('flume/ThriftFlumeEventServer').Client;
var ThriftFlumeTypes = require('flume/flume_eio_types');
var ThriftTransport = require('thrift/lib/thrift/transport').TBufferedTransport;

/**
 * This provides method to create logger
 *  
 * Example: 
 *   var tfelogger = require("event_logger/logger").ThriftFlumeEventLogger;
 *   var etlogger = new tfelogger("HOST", "PORT");
 *   etlogger.log("project", "event", 0, "version", {"extra" : "property"});
 */
var ThriftFlumeEventLogger = exports.ThriftFlumeEventLogger = function ThriftFlumeEventLogger(host, port) {
 this.host = host;
 this.port = port;
    this.hostname = os.hostname();
    this.debug = false;
 this.conn = null;
 this.client = null;
 this.queue = [];
 this.connect();
};

/**
 * log the event to flume in this format:  
 * yyyy-mm-dd HH:MM:ss,SSS {"json":"String"}
 *
 * @param project the project name
 * @param event the event name
 * @param ks_uid the klout soical id in long
 * @param code_version the code version
 * @param json other properties in json format 
 */
ThriftFlumeEventLogger.prototype.log = function(project, event, id, version, json) {
    var now = new Date();
    var ts = now.valueOf();
    var ns = now.getMilliseconds() * 1000000;

 var content = {
  "project" : project,
  "event" : event,
  "id" : id,
  "version" : version,
 };
 
 var body = this.timeString(now) + "\t" + JSON.stringify(this.merge(content,json));

    // Package it up for thrift
    var obj = {
        body: body,
        host: this.hostname,
        priority: ThriftFlumeTypes.Priority.INFO,
        fields: {},
        timestamp: ts,
        nanos: ns
    };

 // Convert to the type that thrift wants
 this.send(new ThriftFlumeTypes.ThriftFlumeEvent(obj));
};

/**
 * Create connection, keep trying until connects
 */
ThriftFlumeEventLogger.prototype.connect = function() {
 try {
  this.conn = Thrift.createConnection(this.host, this.port, {transport: ThriftTransport});
  if (this.queue.length > 0) {
   this.conn.offline_queue = this.conn.offline_queue.concat(this.queue);
   this.queue = [];
  }
  this.client = Thrift.createClient(FlumeClient, this.conn);
  var self = this;
  this.conn.on("error", function(err) {
   console.log("Connection with error: ", err);
   if (self.debug) {
    console.log(self.conn);
   }
   if (self.conn != null) {
    self.queue = self.conn.offline_queue;
   }
   self.connect();
  });
 } catch (e) {
  console.log("Exception on connecting", e);
  this.connect();
 }
};

/**
 * Send the event
 * @param toSend the thrift event
 */
ThriftFlumeEventLogger.prototype.send = function(toSend) {
 if (this.debug) {
     console.log("sending ", toSend);
 }
 var self = this;
 try {
     this.client.append(toSend, function() {
      self.send(toSend);
     });
 } catch (e) {
  console.log("Exception on sending", e);
  this.connect();
     this.send(toSend);
 }
};

/**
 * Close the connection
 */ 
ThriftFlumeEventLogger.prototype.close = function() {
 this.conn.end();
};
 
/**
 * Create the time in string with the format of "yyyy-mm-dd HH:MM:ss,SSS"
 * @param d the date object
 * @return the date string
 */
ThriftFlumeEventLogger.prototype.timeString = function(d) {
 return d.getFullYear() + "-" + this.zeroPad(d.getMonth() + 1, 2) + "-" + this.zeroPad(d.getDate(), 2) + " " + 
  this.zeroPad(d.getHours(), 2) + ":" + this.zeroPad(d.getMinutes(), 2) + ":" + this.zeroPad(d.getSeconds(), 2) + "," + this.zeroPad(d.getMilliseconds(), 3);
};

/**
 * Return string with additional zero in front when it is less than the length
 * @param str the input, can be string or int
 * @param length the string length
 * @return the string
 */
ThriftFlumeEventLogger.prototype.zeroPad = function(str, length) {
 str = str + "";
 while (str.length < length) {
   str = "0" + str;
 }
 return str;
};

/**
 * Merge two objects and return one
 * @param obj1 object 1
 * @param obj2 object 2
 * @return the merged object
 */
ThriftFlumeEventLogger.prototype.merge = function(obj1, obj2) {
 for (var attr in obj2) {
  obj1[attr] = obj2[attr];
 }
 return obj1;
};

gem install issue on Mac Lion

gem install [whatever] on Mac Lion with Ruby >= 1.9.2 and Gem >= 1.3.7

When error is seen:
ERROR: http://rubygems.org/ does not appear to be a repository
ERROR: Could not find a valid gem 'whatever' (>= 0) in any repository

It is not the proxy problem if you sure you are not behind one.
If you don't want to downgrade your Ruby to 1.8.7, here's stupid but working solution.

Visit http://rubygems.org/ and download the gem and all its dependencies from there.
Go to the downloaded directory and install it one more time.

Friday, March 02, 2012