Monday, November 05, 2012
Tuesday, October 16, 2012
Thursday, May 03, 2012
Sunday, April 29, 2012
Thursday, March 08, 2012
Flume Log4j Appender
Using Thrift 0.4.0, Flume 0.9.3-cdh3u0
import java.net.InetAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import com.cloudera.flume.handlers.thrift.Priority;
import com.cloudera.flume.handlers.thrift.ThriftFlumeEvent;
import com.cloudera.flume.handlers.thrift.ThriftFlumeEventServer.Client;
public class FlumeLogAppender extends AppenderSkeleton {
private String hostname;
private String host;
private int port;
private Client client;
private TTransport transport;
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getHostname() {
return hostname;
}
public void setHostname(String hostname) {
this.hostname = hostname;
}
public void configure() {
try {
if (hostname == null) {
try {
hostname = InetAddress.getLocalHost().getCanonicalHostName();
} catch (UnknownHostException e) {
// can't get hostname
}
}
// Thrift boilerplate code
transport = new TSocket(new Socket(host, port));
TBinaryProtocol protocol = new TBinaryProtocol(transport);
client = new Client(protocol);
} catch (Exception e) {
System.out.println(e);
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
connect();
}
}
@Override
public void append(LoggingEvent loggingEvent) {
connect();
ThriftFlumeEvent tfe = new ThriftFlumeEvent();
try {
String message = layout.format(loggingEvent);
Map fields = new HashMap();
tfe.fields = fields;
tfe.priority = Priority.INFO;
tfe.timestamp = loggingEvent.getTimeStamp();
tfe.host = hostname;
tfe.body = ByteBuffer.wrap(message.getBytes());
} catch (Exception e) {
System.out.println(e);
System.out.println(loggingEvent);
}
while (true) {
try {
client.append(tfe);
break;
} catch (TTransportException e) {
System.out.println(e);
transport.close();
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
connect();
} catch (Exception e) {
System.out.println(e);
}
}
}
public void connect() {
if (transport != null && transport.isOpen())
return;
if (transport != null && transport.isOpen() == false) {
transport.close();
}
configure();
}
public void close() {
if (transport != null && transport.isOpen()) {
transport.close();
}
}
public boolean requiresLayout() {
return true;
}
}
Node JS version for sending ThriftFlumeEvent to Flume
This is using Thrift 0.8.0, Node.js 0.6.10
var os = require('os');
var Thrift = require('thrift');
var FlumeClient = require('flume/ThriftFlumeEventServer').Client;
var ThriftFlumeTypes = require('flume/flume_eio_types');
var ThriftTransport = require('thrift/lib/thrift/transport').TBufferedTransport;
/**
* This provides method to create logger
*
* Example:
* var tfelogger = require("event_logger/logger").ThriftFlumeEventLogger;
* var etlogger = new tfelogger("HOST", "PORT");
* etlogger.log("project", "event", 0, "version", {"extra" : "property"});
*/
var ThriftFlumeEventLogger = exports.ThriftFlumeEventLogger = function ThriftFlumeEventLogger(host, port) {
this.host = host;
this.port = port;
this.hostname = os.hostname();
this.debug = false;
this.conn = null;
this.client = null;
this.queue = [];
this.connect();
};
/**
* log the event to flume in this format:
* yyyy-mm-dd HH:MM:ss,SSS {"json":"String"}
*
* @param project the project name
* @param event the event name
* @param ks_uid the klout soical id in long
* @param code_version the code version
* @param json other properties in json format
*/
ThriftFlumeEventLogger.prototype.log = function(project, event, id, version, json) {
var now = new Date();
var ts = now.valueOf();
var ns = now.getMilliseconds() * 1000000;
var content = {
"project" : project,
"event" : event,
"id" : id,
"version" : version,
};
var body = this.timeString(now) + "\t" + JSON.stringify(this.merge(content,json));
// Package it up for thrift
var obj = {
body: body,
host: this.hostname,
priority: ThriftFlumeTypes.Priority.INFO,
fields: {},
timestamp: ts,
nanos: ns
};
// Convert to the type that thrift wants
this.send(new ThriftFlumeTypes.ThriftFlumeEvent(obj));
};
/**
* Create connection, keep trying until connects
*/
ThriftFlumeEventLogger.prototype.connect = function() {
try {
this.conn = Thrift.createConnection(this.host, this.port, {transport: ThriftTransport});
if (this.queue.length > 0) {
this.conn.offline_queue = this.conn.offline_queue.concat(this.queue);
this.queue = [];
}
this.client = Thrift.createClient(FlumeClient, this.conn);
var self = this;
this.conn.on("error", function(err) {
console.log("Connection with error: ", err);
if (self.debug) {
console.log(self.conn);
}
if (self.conn != null) {
self.queue = self.conn.offline_queue;
}
self.connect();
});
} catch (e) {
console.log("Exception on connecting", e);
this.connect();
}
};
/**
* Send the event
* @param toSend the thrift event
*/
ThriftFlumeEventLogger.prototype.send = function(toSend) {
if (this.debug) {
console.log("sending ", toSend);
}
var self = this;
try {
this.client.append(toSend, function() {
self.send(toSend);
});
} catch (e) {
console.log("Exception on sending", e);
this.connect();
this.send(toSend);
}
};
/**
* Close the connection
*/
ThriftFlumeEventLogger.prototype.close = function() {
this.conn.end();
};
/**
* Create the time in string with the format of "yyyy-mm-dd HH:MM:ss,SSS"
* @param d the date object
* @return the date string
*/
ThriftFlumeEventLogger.prototype.timeString = function(d) {
return d.getFullYear() + "-" + this.zeroPad(d.getMonth() + 1, 2) + "-" + this.zeroPad(d.getDate(), 2) + " " +
this.zeroPad(d.getHours(), 2) + ":" + this.zeroPad(d.getMinutes(), 2) + ":" + this.zeroPad(d.getSeconds(), 2) + "," + this.zeroPad(d.getMilliseconds(), 3);
};
/**
* Return string with additional zero in front when it is less than the length
* @param str the input, can be string or int
* @param length the string length
* @return the string
*/
ThriftFlumeEventLogger.prototype.zeroPad = function(str, length) {
str = str + "";
while (str.length < length) {
str = "0" + str;
}
return str;
};
/**
* Merge two objects and return one
* @param obj1 object 1
* @param obj2 object 2
* @return the merged object
*/
ThriftFlumeEventLogger.prototype.merge = function(obj1, obj2) {
for (var attr in obj2) {
obj1[attr] = obj2[attr];
}
return obj1;
};
gem install issue on Mac Lion
gem install [whatever] on Mac Lion with Ruby >= 1.9.2 and Gem >= 1.3.7
When error is seen:
ERROR: http://rubygems.org/ does not appear to be a repository
ERROR: Could not find a valid gem 'whatever' (>= 0) in any repository
It is not the proxy problem if you sure you are not behind one.
If you don't want to downgrade your Ruby to 1.8.7, here's stupid but working solution.
Visit http://rubygems.org/ and download the gem and all its dependencies from there.
Go to the downloaded directory and install it one more time.
When error is seen:
ERROR: http://rubygems.org/ does not appear to be a repository
ERROR: Could not find a valid gem 'whatever' (>= 0) in any repository
It is not the proxy problem if you sure you are not behind one.
If you don't want to downgrade your Ruby to 1.8.7, here's stupid but working solution.
Visit http://rubygems.org/ and download the gem and all its dependencies from there.
Go to the downloaded directory and install it one more time.
Friday, March 02, 2012
Monday, February 27, 2012
Thursday, February 02, 2012
Wednesday, February 01, 2012
Wednesday, January 04, 2012
Subscribe to:
Posts (Atom)