var os = require('os');
var Thrift = require('thrift');
var FlumeClient = require('flume/ThriftFlumeEventServer').Client;
var ThriftFlumeTypes = require('flume/flume_eio_types');
var ThriftTransport = require('thrift/lib/thrift/transport').TBufferedTransport;
/**
* This provides method to create logger
*
* Example:
* var tfelogger = require("event_logger/logger").ThriftFlumeEventLogger;
* var etlogger = new tfelogger("HOST", "PORT");
* etlogger.log("project", "event", 0, "version", {"extra" : "property"});
*/
var ThriftFlumeEventLogger = exports.ThriftFlumeEventLogger = function ThriftFlumeEventLogger(host, port) {
this.host = host;
this.port = port;
this.hostname = os.hostname();
this.debug = false;
this.conn = null;
this.client = null;
this.queue = [];
this.connect();
};
/**
* log the event to flume in this format:
* yyyy-mm-dd HH:MM:ss,SSS {"json":"String"}
*
* @param project the project name
* @param event the event name
* @param ks_uid the klout soical id in long
* @param code_version the code version
* @param json other properties in json format
*/
ThriftFlumeEventLogger.prototype.log = function(project, event, id, version, json) {
var now = new Date();
var ts = now.valueOf();
var ns = now.getMilliseconds() * 1000000;
var content = {
"project" : project,
"event" : event,
"id" : id,
"version" : version,
};
var body = this.timeString(now) + "\t" + JSON.stringify(this.merge(content,json));
// Package it up for thrift
var obj = {
body: body,
host: this.hostname,
priority: ThriftFlumeTypes.Priority.INFO,
fields: {},
timestamp: ts,
nanos: ns
};
// Convert to the type that thrift wants
this.send(new ThriftFlumeTypes.ThriftFlumeEvent(obj));
};
/**
* Create connection, keep trying until connects
*/
ThriftFlumeEventLogger.prototype.connect = function() {
try {
this.conn = Thrift.createConnection(this.host, this.port, {transport: ThriftTransport});
if (this.queue.length > 0) {
this.conn.offline_queue = this.conn.offline_queue.concat(this.queue);
this.queue = [];
}
this.client = Thrift.createClient(FlumeClient, this.conn);
var self = this;
this.conn.on("error", function(err) {
console.log("Connection with error: ", err);
if (self.debug) {
console.log(self.conn);
}
if (self.conn != null) {
self.queue = self.conn.offline_queue;
}
self.connect();
});
} catch (e) {
console.log("Exception on connecting", e);
this.connect();
}
};
/**
* Send the event
* @param toSend the thrift event
*/
ThriftFlumeEventLogger.prototype.send = function(toSend) {
if (this.debug) {
console.log("sending ", toSend);
}
var self = this;
try {
this.client.append(toSend, function() {
self.send(toSend);
});
} catch (e) {
console.log("Exception on sending", e);
this.connect();
this.send(toSend);
}
};
/**
* Close the connection
*/
ThriftFlumeEventLogger.prototype.close = function() {
this.conn.end();
};
/**
* Create the time in string with the format of "yyyy-mm-dd HH:MM:ss,SSS"
* @param d the date object
* @return the date string
*/
ThriftFlumeEventLogger.prototype.timeString = function(d) {
return d.getFullYear() + "-" + this.zeroPad(d.getMonth() + 1, 2) + "-" + this.zeroPad(d.getDate(), 2) + " " +
this.zeroPad(d.getHours(), 2) + ":" + this.zeroPad(d.getMinutes(), 2) + ":" + this.zeroPad(d.getSeconds(), 2) + "," + this.zeroPad(d.getMilliseconds(), 3);
};
/**
* Return string with additional zero in front when it is less than the length
* @param str the input, can be string or int
* @param length the string length
* @return the string
*/
ThriftFlumeEventLogger.prototype.zeroPad = function(str, length) {
str = str + "";
while (str.length < length) {
str = "0" + str;
}
return str;
};
/**
* Merge two objects and return one
* @param obj1 object 1
* @param obj2 object 2
* @return the merged object
*/
ThriftFlumeEventLogger.prototype.merge = function(obj1, obj2) {
for (var attr in obj2) {
obj1[attr] = obj2[attr];
}
return obj1;
};
Thursday, March 08, 2012
Node JS version for sending ThriftFlumeEvent to Flume
This is using Thrift 0.8.0, Node.js 0.6.10
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment