Thursday, March 08, 2012

Node JS version for sending ThriftFlumeEvent to Flume

This is using Thrift 0.8.0, Node.js 0.6.10

var os = require('os');
var Thrift = require('thrift');
var FlumeClient = require('flume/ThriftFlumeEventServer').Client;
var ThriftFlumeTypes = require('flume/flume_eio_types');
var ThriftTransport = require('thrift/lib/thrift/transport').TBufferedTransport;

/**
 * This provides method to create logger
 *  
 * Example: 
 *   var tfelogger = require("event_logger/logger").ThriftFlumeEventLogger;
 *   var etlogger = new tfelogger("HOST", "PORT");
 *   etlogger.log("project", "event", 0, "version", {"extra" : "property"});
 */
var ThriftFlumeEventLogger = exports.ThriftFlumeEventLogger = function ThriftFlumeEventLogger(host, port) {
 this.host = host;
 this.port = port;
    this.hostname = os.hostname();
    this.debug = false;
 this.conn = null;
 this.client = null;
 this.queue = [];
 this.connect();
};

/**
 * log the event to flume in this format:  
 * yyyy-mm-dd HH:MM:ss,SSS {"json":"String"}
 *
 * @param project the project name
 * @param event the event name
 * @param ks_uid the klout soical id in long
 * @param code_version the code version
 * @param json other properties in json format 
 */
ThriftFlumeEventLogger.prototype.log = function(project, event, id, version, json) {
    var now = new Date();
    var ts = now.valueOf();
    var ns = now.getMilliseconds() * 1000000;

 var content = {
  "project" : project,
  "event" : event,
  "id" : id,
  "version" : version,
 };
 
 var body = this.timeString(now) + "\t" + JSON.stringify(this.merge(content,json));

    // Package it up for thrift
    var obj = {
        body: body,
        host: this.hostname,
        priority: ThriftFlumeTypes.Priority.INFO,
        fields: {},
        timestamp: ts,
        nanos: ns
    };

 // Convert to the type that thrift wants
 this.send(new ThriftFlumeTypes.ThriftFlumeEvent(obj));
};

/**
 * Create connection, keep trying until connects
 */
ThriftFlumeEventLogger.prototype.connect = function() {
 try {
  this.conn = Thrift.createConnection(this.host, this.port, {transport: ThriftTransport});
  if (this.queue.length > 0) {
   this.conn.offline_queue = this.conn.offline_queue.concat(this.queue);
   this.queue = [];
  }
  this.client = Thrift.createClient(FlumeClient, this.conn);
  var self = this;
  this.conn.on("error", function(err) {
   console.log("Connection with error: ", err);
   if (self.debug) {
    console.log(self.conn);
   }
   if (self.conn != null) {
    self.queue = self.conn.offline_queue;
   }
   self.connect();
  });
 } catch (e) {
  console.log("Exception on connecting", e);
  this.connect();
 }
};

/**
 * Send the event
 * @param toSend the thrift event
 */
ThriftFlumeEventLogger.prototype.send = function(toSend) {
 if (this.debug) {
     console.log("sending ", toSend);
 }
 var self = this;
 try {
     this.client.append(toSend, function() {
      self.send(toSend);
     });
 } catch (e) {
  console.log("Exception on sending", e);
  this.connect();
     this.send(toSend);
 }
};

/**
 * Close the connection
 */ 
ThriftFlumeEventLogger.prototype.close = function() {
 this.conn.end();
};
 
/**
 * Create the time in string with the format of "yyyy-mm-dd HH:MM:ss,SSS"
 * @param d the date object
 * @return the date string
 */
ThriftFlumeEventLogger.prototype.timeString = function(d) {
 return d.getFullYear() + "-" + this.zeroPad(d.getMonth() + 1, 2) + "-" + this.zeroPad(d.getDate(), 2) + " " + 
  this.zeroPad(d.getHours(), 2) + ":" + this.zeroPad(d.getMinutes(), 2) + ":" + this.zeroPad(d.getSeconds(), 2) + "," + this.zeroPad(d.getMilliseconds(), 3);
};

/**
 * Return string with additional zero in front when it is less than the length
 * @param str the input, can be string or int
 * @param length the string length
 * @return the string
 */
ThriftFlumeEventLogger.prototype.zeroPad = function(str, length) {
 str = str + "";
 while (str.length < length) {
   str = "0" + str;
 }
 return str;
};

/**
 * Merge two objects and return one
 * @param obj1 object 1
 * @param obj2 object 2
 * @return the merged object
 */
ThriftFlumeEventLogger.prototype.merge = function(obj1, obj2) {
 for (var attr in obj2) {
  obj1[attr] = obj2[attr];
 }
 return obj1;
};

No comments: