[xmppd-dev] commit r1554 - in trunk/jabberd14: jabberd jabberd/lib resolver
mail at jabberd.org
mail at jabberd.org
Tue Mar 17 01:09:35 CET 2009
Author: mawis
Date: Tue Mar 17 01:09:35 2009
New Revision: 1554
Log:
Work in progress on the new DNS resolver.
- still with bugs (I expect)
- expiring of queries not yet implemented
- correct ordering of result SRV records not yet implemented
Added:
trunk/jabberd14/resolver/resend_service.cc (contents, props changed)
- copied, changed from r1539, trunk/jabberd14/resolver/resolver.cc
trunk/jabberd14/resolver/resolver_job.cc (contents, props changed)
- copied, changed from r1539, trunk/jabberd14/resolver/resolver.cc
Modified:
trunk/jabberd14/jabberd/deliver.cc
trunk/jabberd14/jabberd/lib/jabberdlib.h
trunk/jabberd14/jabberd/lib/lwresc.cc
trunk/jabberd14/resolver/Makefile.am
trunk/jabberd14/resolver/resolver.cc
trunk/jabberd14/resolver/resolver.h
Modified: trunk/jabberd14/jabberd/deliver.cc
==============================================================================
--- trunk/jabberd14/jabberd/deliver.cc Mon Mar 16 18:21:38 2009 (r1553)
+++ trunk/jabberd14/jabberd/deliver.cc Tue Mar 17 01:09:35 2009 (r1554)
@@ -138,7 +138,6 @@
*/
typedef struct deliver_mp_st {
pth_message_t head; /**< the standard pth message header */
- instance i; /**< the sending instance */
dpacket p; /**< the queued packet */
} _deliver_msg,*deliver_msg;
@@ -653,7 +652,7 @@
* deliver a ::dpacket to an ::instance using the configured XML routings
*
* @param p the packet that should be delivered (packet gets consumed)
- * @param i the instance of the sender (!) of the packet
+ * @param i unused/ignored (was: the instance of the sender (!) of the packet)
*/
void deliver(dpacket p, instance i) {
ilist a, b;
@@ -667,7 +666,7 @@
/* begin delivery of postponed messages */
deliver_msg d;
while ((d=(deliver_msg)pth_msgport_get(deliver__mp))!=NULL) {
- deliver(d->p,d->i);
+ deliver(d->p, NULL);
}
pth_msgport_destroy(deliver__mp);
deliver__mp = NULL;
@@ -691,7 +690,6 @@
if (deliver__mp == NULL)
deliver__mp = pth_msgport_create("deliver__");
- d->i = i;
d->p = p;
pth_msgport_put(deliver__mp, reinterpret_cast<pth_message_t*>(d));
Modified: trunk/jabberd14/jabberd/lib/jabberdlib.h
==============================================================================
--- trunk/jabberd14/jabberd/lib/jabberdlib.h Mon Mar 16 18:21:38 2009 (r1553)
+++ trunk/jabberd14/jabberd/lib/jabberdlib.h Tue Mar 17 01:09:35 2009 (r1554)
@@ -1455,6 +1455,8 @@
};
class rrecord {
+ public:
+ virtual ~rrecord();
};
class srv_record : public rrecord {
Modified: trunk/jabberd14/jabberd/lib/lwresc.cc
==============================================================================
--- trunk/jabberd14/jabberd/lib/lwresc.cc Mon Mar 16 18:21:38 2009 (r1553)
+++ trunk/jabberd14/jabberd/lib/lwresc.cc Tue Mar 17 01:09:35 2009 (r1554)
@@ -357,6 +357,9 @@
}
}
+ rrecord::~rrecord() {
+ }
+
srv_record::srv_record(std::istream& is) {
try {
uint16_t rrlen = lwresult::read_uint16(is);
Modified: trunk/jabberd14/resolver/Makefile.am
==============================================================================
--- trunk/jabberd14/resolver/Makefile.am Mon Mar 16 18:21:38 2009 (r1553)
+++ trunk/jabberd14/resolver/Makefile.am Tue Mar 17 01:09:35 2009 (r1554)
@@ -2,7 +2,7 @@
noinst_HEADERS = resolver.h
-libjabberdresolver_la_SOURCES = resolver.cc
+libjabberdresolver_la_SOURCES = resend_service.cc resolver.cc resolver_job.cc
libjabberdresolver_la_LIBADD = $(top_builddir)/jabberd/libjabberd.la
libjabberdresolver_la_LDFLAGS = @LDFLAGS@ @VERSION_INFO@ -module -version-info 2:0:0
Copied and modified: trunk/jabberd14/resolver/resend_service.cc (from r1539, trunk/jabberd14/resolver/resolver.cc)
==============================================================================
--- trunk/jabberd14/resolver/resolver.cc Wed Sep 10 02:35:57 2008 (r1539, copy source)
+++ trunk/jabberd14/resolver/resend_service.cc Tue Mar 17 01:09:35 2009 (r1554)
@@ -1,7 +1,7 @@
/*
* Copyrights
*
- * Copyright (c) 2008 Matthias Wimmer
+ * Copyright (c) 2008/2009 Matthias Wimmer
*
* This file is part of jabberd14.
*
@@ -33,119 +33,6 @@
namespace xmppd {
namespace resolver {
- resolver::resolver(instance i, xmlnode x) : instance_base(i, x), lwresd_socket(NULL), queue_timeout(60), lwresd_host("localhost"), lwresd_service("921") {
- configurate();
-
- open_lwresd_socket();
- }
-
- void resolver::open_lwresd_socket() {
- int udp_socket = make_netsocket2(lwresd_service, lwresd_host, NETSOCKET_UDP);
-
- lwresd_socket = mio_new(udp_socket, mio_callback, this, MIO_CONNECT_RAW);
- }
-
- void resolver::mio_callback(mio m, int state, void* arg, xmlnode x, char* buffer, int bufsz) {
- // sanity check
- if (!arg) {
- return;
- }
-
- // make everything a bit nicer and call mio_event
- static_cast<resolver*>(arg)->mio_event(m, state, buffer ? std::string(buffer, bufsz) : std::string());
- }
-
- void resolver::mio_event(mio m, int state, std::string const& buffer) {
- switch (state) {
- case MIO_CLOSED:
- mio_event_closed(m);
- break;
- case MIO_BUFFER:
- mio_event_buffer(m, buffer);
- break;
- case MIO_ERROR:
- default:
- mio_event_error(m);
- }
- }
-
- void resolver::mio_event_buffer(mio m, std::string const& buffer) {
- // parse the result
- std::istringstream buffer_stream(buffer);
- xmppd::lwresc::lwresult query_result(buffer_stream);
-
- // send the signal for this result
- uint32_t serial = query_result.getSerial();
- std::map<uint32_t, std::pair<time_t, sigc::signal<void, xmppd::lwresc::lwresult const&> > >::iterator result_listener = result_listeners.find(serial);
- if (result_listener != result_listeners.end()) {
- result_listener->second.second.emit(query_result);
- result_listeners.erase(result_listener);
- }
-
- }
-
- void resolver::mio_event_closed(mio m) {
- }
-
- void resolver::mio_event_error(mio m) {
- }
-
- std::list<resend_service> const& resolver::get_resend_services() {
- return resend_services;
- }
-
- void resolver::configurate() {
- // get the configuration
- xmlnode config = get_instance_config();
-
- // get and iterate the resend childs
- xht namespaces = xhash_new(3);
- xhash_put(namespaces, "dnsrv", const_cast<char*>(NS_JABBERD_CONFIG_DNSRV));
- xmlnode_vector resend_elements = xmlnode_get_tags(config, "dnsrv:resend", namespaces);
- for (xmlnode_vector::iterator p = resend_elements.begin(); p != resend_elements.end(); ++p) {
- try {
- resend_services.push_back(resend_service(*p));
- } catch (std::invalid_argument) {
- }
- }
-
- // get the queue timeout (time we are waiting to a DNS resolving result
- char const* queuetimeout_attrib = xmlnode_get_attrib_ns(config, "queuetimeout", NULL);
- if (queuetimeout_attrib) {
- std::istringstream queuetimeout_stream(queuetimeout_attrib);
- queuetimeout_stream >> queue_timeout;
-
- // a timeout of less than 10 seconds does not seem to make sense
- if (queue_timeout < 10) {
- queue_timeout = 10;
- }
- } else {
- queue_timeout = 60;
- }
- set_heartbeat_interval(queue_timeout);
-
- // free temp resources
- xhash_free(namespaces);
- namespaces = NULL;
- }
-
- void resolver::send_query(xmppd::lwresc::lwquery const& query) {
- // get binary representation of the query
- std::ostringstream query_bin;
- query_bin << query;
-
- // send it
- mio_write(lwresd_socket, NULL, query_bin.str().c_str(), query_bin.str().length());
- }
-
- sigc::connection resolver::register_result_callback(uint32_t serial, sigc::signal<void, xmppd::lwresc::lwresult const&>::slot_type const& callback) {
- if (result_listeners.find(serial) == result_listeners.end()) {
- result_listeners[serial] = std::pair<time_t, sigc::signal<void, xmppd::lwresc::lwresult const&> >(std::time(NULL), sigc::signal<void, xmppd::lwresc::lwresult const&>());
- }
-
- return result_listeners[serial].second.connect(callback);
- }
-
resend_service::resend_service(xmlnode resend) : weight_sum(0) {
char const* service_attribute_value = xmlnode_get_attrib_ns(resend, "service", NULL);
@@ -215,132 +102,18 @@
return service;
}
- result resolver::on_stanza_packet(dpacket dp) {
- // sanity check
- if (!dp || !dp->host)
- return r_ERR;
-
- // check if the packet already has been resolved (in the case of looping packets)
- if (xmlnode_get_attrib_ns(dp->x, "ip", NULL) || xmlnode_get_attrib_ns(dp->x, "iperror", NULL)) {
- char const* packet_type = xmlnode_get_attrib_ns(dp->x, "type", NULL);
-
- // drop type='error', bounce everything else
- if (packet_type && Glib::ustring(packet_type) == "error") {
- log_warn(dp->host, "Looping DNS request. Dropped: %s", xmlnode_serialize_string(dp->x, xmppd::ns_decl_list(), 0));
- xmlnode_free(dp->x);
- } else {
- deliver_fail(dp, N_("Looping DNS request. Dropped."));
- }
-
- return r_DONE;
- }
-
- // is there already a resolve request pending for this domain? Just add to queue for this resolving
- if (pending_jobs.find(dp->host) != pending_jobs.end()) {
- pending_jobs[dp->host]->add_packet(dp);
- return r_DONE;
- }
-
- // store the packet, so that we can forward it when it has been resolved, and start resolving
- pending_jobs[dp->host] = new resolver_job(*this, dp);
- return r_DONE;
- }
-
- resolver_job::resolver_job(resolver& owner, dpacket dp) : owner(owner), waited_srv_serial(0) {
- // sanity check
- if (!dp->host) {
- throw std::invalid_argument("dpacket has no host");
- }
-
- // keep the packet
- add_packet(dp);
-
- // keep destination explicitly for faster access
- destination = dp->host;
-
- // get the services and resend destinations that we have to use (make copy)
- resend_services = owner.get_resend_services();
-
- // set current service
- current_service = resend_services.begin();
-
- // start resolving this service
- start_resolving_service();
- }
-
- resolver_job::~resolver_job() {
- // disconnect all signals pointing to us
- for (std::list<sigc::connection>::iterator p = connected_signals.begin(); p != connected_signals.end(); ++p) {
- p->disconnect();
- }
- }
-
- void resolver_job::add_packet(dpacket dp) {
- waiting_packets.push_back(dp);
- }
-
- void resolver_job::start_resolving_service() {
- // reset the list of providing hosts
- providing_hosts.erase(providing_hosts.begin(), providing_hosts.end());
-
- // do we have a service, or do have have do plain AAAA+A queries?
- if (current_service->is_explicit_service()) {
- // need to do SRV lookup
- //
- // create query
- std::ostringstream name_to_resolve;
- name_to_resolve << std::string(current_service->get_service_prefix()) << "." << std::string(destination);
-
- xmppd::lwresc::rrsetbyname query(name_to_resolve.str(), ns_c_in, ns_t_srv);
-
- // register result callback
- waited_srv_serial = query.getSerial();
- connected_signals.push_back(owner.register_result_callback(query.getSerial(), sigc::mem_fun(*this, &xmppd::resolver::resolver_job::on_srv_query_result)));
-
- // send query
- owner.send_query(query);
- } else {
- // no SRV lookup, just plain AAAA+A
-
- // the hosts providing the service is just the destination on port 5269
- // so put this in providing_hosts list
- providing_hosts.push_back(std::pair<Glib::ustring, Glib::ustring>(destination, "5269"));
+ xmppd::jabberid resend_service::get_resend_host() const {
+ int host_die = std::rand() % weight_sum;
- // no real SRV lookup step needed, so directly start the AAAA+A query stap
- resolve_providing_hosts();
- }
- }
+ for (std::list< std::pair<int, xmppd::jabberid> >::const_iterator p = resend_hosts.begin(); p != resend_hosts.end(); ++p) {
+ host_die -= p->first;
- void resolver_job::resolve_providing_hosts() {
- // XXX implement this method
- }
-
- void resolver_job::on_srv_query_result(xmppd::lwresc::lwresult const& result) {
- // ignore all results we are not waiting for
- if (result.getSerial() != waited_srv_serial)
- return;
-
- // did we successfully get a result?
- if (result.getResult() != xmppd::lwresc::lwresult::res_success) {
- // try next service
- ++current_service;
- start_resolving_service();
- return;
+ if (host_die <= 0) {
+ return p->second;
+ }
}
- // XXX implement this method
+ return resend_hosts.begin()->second;
}
}
}
-
-/**
- * init and register the resolver component in the server
- *
- * @todo care for destructing the resolver instance on shutdown
- *
- * @param i the jabber server's data about this instance
- * @param x xmlnode of this instances configuration (???)
- */
-extern "C" void resolver(instance i, xmlnode x) {
- xmppd::resolver::resolver* ri = new xmppd::resolver::resolver(i, x);
-}
Modified: trunk/jabberd14/resolver/resolver.cc
==============================================================================
--- trunk/jabberd14/resolver/resolver.cc Mon Mar 16 18:21:38 2009 (r1553)
+++ trunk/jabberd14/resolver/resolver.cc Tue Mar 17 01:09:35 2009 (r1554)
@@ -1,7 +1,7 @@
/*
* Copyrights
*
- * Copyright (c) 2008 Matthias Wimmer
+ * Copyright (c) 2008/2009 Matthias Wimmer
*
* This file is part of jabberd14.
*
@@ -146,75 +146,6 @@
return result_listeners[serial].second.connect(callback);
}
- resend_service::resend_service(xmlnode resend) : weight_sum(0) {
- char const* service_attribute_value = xmlnode_get_attrib_ns(resend, "service", NULL);
-
- // if there is a service attribute, keep it
- if (service_attribute_value)
- service = service_attribute_value;
-
- // check, get and iterate partial childs
- xht namespaces = xhash_new(3);
- xhash_put(namespaces, "dnsrv", const_cast<char*>(NS_JABBERD_CONFIG_DNSRV));
- xmlnode_vector partial_elements = xmlnode_get_tags(resend, "dnsrv:partial", namespaces);
- xhash_free(namespaces);
- namespaces = NULL;
- for (xmlnode_vector::iterator p = partial_elements.begin(); p != partial_elements.end(); ++p) {
- // get the weight for this partial destination
- char const* weight_attrib = xmlnode_get_attrib_ns(*p, "weight", NULL);
- int weight = 1;
- if (weight_attrib) {
- std::istringstream weight_stream(weight_attrib);
- weight_stream >> weight;
- }
- if (weight < 1)
- weight = 1;
-
- // get the destination
- char const* resend_dest = xmlnode_get_data(*p);
- if (!resend_dest)
- continue;
- try {
- xmppd::jabberid resend_jid(resend_dest);
-
- // keep
- resend_hosts.push_back(std::pair<int, xmppd::jabberid>(weight, resend_jid));
- weight_sum += weight;
- } catch (std::invalid_argument) {
- continue;
- }
-
- }
-
- // if there where no partial childs, use the text() child of the <resend/> element
- if (resend_hosts.empty()) {
- try {
- char const* resend_dest = xmlnode_get_data(resend);
- if (resend_dest) {
- xmppd::jabberid resend_jid(resend_dest);
-
- // keep
- resend_hosts.push_back(std::pair<int, xmppd::jabberid>(1, resend_jid));
- weight_sum++;
- }
- } catch (std::invalid_argument) {
- }
- }
-
- // still no valid resend_hosts?
- if (resend_hosts.empty()) {
- throw std::invalid_argument("resend config contains no valid destination");
- }
- }
-
- bool resend_service::is_explicit_service() const {
- return service.length() > 0;
- }
-
- Glib::ustring const& resend_service::get_service_prefix() const {
- return service;
- }
-
result resolver::on_stanza_packet(dpacket dp) {
// sanity check
if (!dp || !dp->host)
@@ -243,92 +174,44 @@
// store the packet, so that we can forward it when it has been resolved, and start resolving
pending_jobs[dp->host] = new resolver_job(*this, dp);
+ pending_jobs[dp->host]->register_result_callback(sigc::mem_fun(*this, &xmppd::resolver::resolver::handle_completed_job));
return r_DONE;
}
- resolver_job::resolver_job(resolver& owner, dpacket dp) : owner(owner), waited_srv_serial(0) {
- // sanity check
- if (!dp->host) {
- throw std::invalid_argument("dpacket has no host");
- }
-
- // keep the packet
- add_packet(dp);
-
- // keep destination explicitly for faster access
- destination = dp->host;
-
- // get the services and resend destinations that we have to use (make copy)
- resend_services = owner.get_resend_services();
-
- // set current service
- current_service = resend_services.begin();
-
- // start resolving this service
- start_resolving_service();
- }
-
- resolver_job::~resolver_job() {
- // disconnect all signals pointing to us
- for (std::list<sigc::connection>::iterator p = connected_signals.begin(); p != connected_signals.end(); ++p) {
- p->disconnect();
- }
- }
-
- void resolver_job::add_packet(dpacket dp) {
- waiting_packets.push_back(dp);
- }
-
- void resolver_job::start_resolving_service() {
- // reset the list of providing hosts
- providing_hosts.erase(providing_hosts.begin(), providing_hosts.end());
-
- // do we have a service, or do have have do plain AAAA+A queries?
- if (current_service->is_explicit_service()) {
- // need to do SRV lookup
- //
- // create query
- std::ostringstream name_to_resolve;
- name_to_resolve << std::string(current_service->get_service_prefix()) << "." << std::string(destination);
-
- xmppd::lwresc::rrsetbyname query(name_to_resolve.str(), ns_c_in, ns_t_srv);
-
- // register result callback
- waited_srv_serial = query.getSerial();
- connected_signals.push_back(owner.register_result_callback(query.getSerial(), sigc::mem_fun(*this, &xmppd::resolver::resolver_job::on_srv_query_result)));
-
- // send query
- owner.send_query(query);
+ void resolver::resend_packet(xmlnode pkt, Glib::ustring ips, Glib::ustring to) {
+ if (ips.empty()) {
+ jutil_error_xmpp(pkt, (xterror){502, N_("Unable to resolve hostname."), "wait", "service-unavailable"});
+ xmlnode_put_attrib_ns(pkt, "iperror", NULL, NULL, "");
} else {
- // no SRV lookup, just plain AAAA+A
-
- // the hosts providing the service is just the destination on port 5269
- // so put this in providing_hosts list
- providing_hosts.push_back(std::pair<Glib::ustring, Glib::ustring>(destination, "5269"));
+ char const* dnsresultto = xmlnode_get_attrib_ns(pkt, "dnsqueryby", NULL);
+ if (!dnsresultto) {
+ dnsresultto = to.c_str();
+ }
- // no real SRV lookup step needed, so directly start the AAAA+A query stap
- resolve_providing_hosts();
+ pkt = xmlnode_wrap_ns(pkt, "route", NULL, NULL);
+ xmlnode_put_attrib_ns(pkt, "to", NULL, NULL, dnsresultto);
+ xmlnode_put_attrib_ns(pkt, "ip", NULL, NULL, ips.c_str());
}
+ deliver(pkt);
}
- void resolver_job::resolve_providing_hosts() {
- // XXX implement this method
- }
+ void resolver::handle_completed_job(resolver_job& job) {
+ // get the packets
+ std::list<dpacket> const& packets = job.get_packets();
- void resolver_job::on_srv_query_result(xmppd::lwresc::lwresult const& result) {
- // ignore all results we are not waiting for
- if (result.getSerial() != waited_srv_serial)
- return;
+ // get the resend destination
+ Glib::ustring resend_host = job.get_resend_host().full();
- // did we successfully get a result?
- if (result.getResult() != xmppd::lwresc::lwresult::res_success) {
- // try next service
- ++current_service;
- start_resolving_service();
- return;
+ // get the resolved ips
+ Glib::ustring ips = job.get_result();
+
+ // resend the packets
+ for (std::list<dpacket>::const_iterator p = packets.begin(); p != packets.end(); ++p) {
+ resend_packet((*p)->x, ips, resend_host);
}
- // XXX implement this method
+ // we are done with it, we can delete the job
+ delete &job;
}
}
}
Modified: trunk/jabberd14/resolver/resolver.h
==============================================================================
--- trunk/jabberd14/resolver/resolver.h Mon Mar 16 18:21:38 2009 (r1553)
+++ trunk/jabberd14/resolver/resolver.h Tue Mar 17 01:09:35 2009 (r1554)
@@ -56,6 +56,12 @@
*/
Glib::ustring const& get_service_prefix() const;
+ /**
+ * select one of the resend_hosts for this service
+ *
+ * @return choosen resend_host
+ */
+ xmppd::jabberid get_resend_host() const;
private:
/**
* the service to resolve, empty string for no service but plain AAAA/A lookups
@@ -107,6 +113,36 @@
* @throws std::invalid_argument if the packet has a different destination, that what is being resolved by this job
*/
void add_packet(dpacket dp);
+
+ /**
+ * get the waiting packets
+ *
+ * @return list of waiting packets
+ */
+ std::list<dpacket> const& get_packets() const;
+
+ /**
+ * register for being notified on finishing the job
+ *
+ * @param callback what should get notified on finishing the job
+ */
+ sigc::connection register_result_callback(sigc::signal<void, resolver_job&>::slot_type const& callback);
+
+ /**
+ * get the resolving result
+ *
+ * @return string containing IP/port pairs as the result to connect to
+ */
+ Glib::ustring get_result() const;
+
+ /**
+ * get the service where to send packets to that have been resolved by this job
+ *
+ * @return the service
+ */
+ xmppd::jabberid get_resend_host() const;
+
+
private:
/**
* the destination, that is being resolved by this job
@@ -150,6 +186,11 @@
std::list< std::pair<Glib::ustring, Glib::ustring> > providing_hosts;
/**
+ * the providing host, that is currently resolved
+ */
+ std::list< std::pair<Glib::ustring, Glib::ustring> >::const_iterator current_providing_host;
+
+ /**
* do AAAA and A lookups for the hosts in providing_hosts
*
* This gets called after providing_hosts has been filled, either explicitly
@@ -159,6 +200,18 @@
void resolve_providing_hosts();
/**
+ * do AAAA and A lookups for the current_current_providing host
+ *
+ * This get called while iterating the entries in providing_hosts after updating current_providing_host
+ */
+ void resolve_current_providing_host();
+
+ /**
+ * do the remaining A lookup after the AAAA lookup has been done for the current_current_providing host
+ */
+ void resolve_current_providing_host_a();
+
+ /**
* list of signals to disconnect on destruction
*/
std::list<sigc::connection> connected_signals;
@@ -169,9 +222,24 @@
void on_srv_query_result(xmppd::lwresc::lwresult const& result);
/**
- * the serial for the SRV query that we have sent last
+ * handle results of AAAA queries to the DNS
+ */
+ void on_aaaa_query_result(xmppd::lwresc::lwresult const& result);
+
+ /**
+ * handle results of A queries to the DNS
+ */
+ void on_a_query_result(xmppd::lwresc::lwresult const& result);
+
+ /**
+ * buffer taking the result of resolving the job
*/
- uint32_t waited_srv_serial;
+ std::ostringstream result_buffer;
+
+ /**
+ * listeners to get notified if the resolver_job is finished
+ */
+ std::list<sigc::signal<void, resolver_job&> > result_listeners;
};
/**
@@ -212,6 +280,24 @@
private:
/**
+ * resend a resolved packet to the configured service
+ *
+ * the packet may overwrite the destination where it wants to get resent to
+ *
+ * @param pkt the packet to resend
+ * @param ips the resolving result to add to the packet
+ * @param to the service to send the packet to (if not overwritten by the pkt itself)
+ */
+ void resend_packet(xmlnode pkt, Glib::ustring ips, Glib::ustring to);
+
+ /**
+ * handles completed resolvings
+ *
+ * @param job the resolver_job that completed
+ */
+ void handle_completed_job(resolver_job& job);
+
+ /**
* handle received stanzas
*
* @param dp the stanza to handle
Copied and modified: trunk/jabberd14/resolver/resolver_job.cc (from r1539, trunk/jabberd14/resolver/resolver.cc)
==============================================================================
--- trunk/jabberd14/resolver/resolver.cc Wed Sep 10 02:35:57 2008 (r1539, copy source)
+++ trunk/jabberd14/resolver/resolver_job.cc Tue Mar 17 01:09:35 2009 (r1554)
@@ -1,7 +1,7 @@
/*
* Copyrights
*
- * Copyright (c) 2008 Matthias Wimmer
+ * Copyright (c) 2008/2009 Matthias Wimmer
*
* This file is part of jabberd14.
*
@@ -30,223 +30,11 @@
*/
#include <resolver.h>
+#include <iostream>
namespace xmppd {
namespace resolver {
- resolver::resolver(instance i, xmlnode x) : instance_base(i, x), lwresd_socket(NULL), queue_timeout(60), lwresd_host("localhost"), lwresd_service("921") {
- configurate();
-
- open_lwresd_socket();
- }
-
- void resolver::open_lwresd_socket() {
- int udp_socket = make_netsocket2(lwresd_service, lwresd_host, NETSOCKET_UDP);
-
- lwresd_socket = mio_new(udp_socket, mio_callback, this, MIO_CONNECT_RAW);
- }
-
- void resolver::mio_callback(mio m, int state, void* arg, xmlnode x, char* buffer, int bufsz) {
- // sanity check
- if (!arg) {
- return;
- }
-
- // make everything a bit nicer and call mio_event
- static_cast<resolver*>(arg)->mio_event(m, state, buffer ? std::string(buffer, bufsz) : std::string());
- }
-
- void resolver::mio_event(mio m, int state, std::string const& buffer) {
- switch (state) {
- case MIO_CLOSED:
- mio_event_closed(m);
- break;
- case MIO_BUFFER:
- mio_event_buffer(m, buffer);
- break;
- case MIO_ERROR:
- default:
- mio_event_error(m);
- }
- }
-
- void resolver::mio_event_buffer(mio m, std::string const& buffer) {
- // parse the result
- std::istringstream buffer_stream(buffer);
- xmppd::lwresc::lwresult query_result(buffer_stream);
-
- // send the signal for this result
- uint32_t serial = query_result.getSerial();
- std::map<uint32_t, std::pair<time_t, sigc::signal<void, xmppd::lwresc::lwresult const&> > >::iterator result_listener = result_listeners.find(serial);
- if (result_listener != result_listeners.end()) {
- result_listener->second.second.emit(query_result);
- result_listeners.erase(result_listener);
- }
-
- }
-
- void resolver::mio_event_closed(mio m) {
- }
-
- void resolver::mio_event_error(mio m) {
- }
-
- std::list<resend_service> const& resolver::get_resend_services() {
- return resend_services;
- }
-
- void resolver::configurate() {
- // get the configuration
- xmlnode config = get_instance_config();
-
- // get and iterate the resend childs
- xht namespaces = xhash_new(3);
- xhash_put(namespaces, "dnsrv", const_cast<char*>(NS_JABBERD_CONFIG_DNSRV));
- xmlnode_vector resend_elements = xmlnode_get_tags(config, "dnsrv:resend", namespaces);
- for (xmlnode_vector::iterator p = resend_elements.begin(); p != resend_elements.end(); ++p) {
- try {
- resend_services.push_back(resend_service(*p));
- } catch (std::invalid_argument) {
- }
- }
-
- // get the queue timeout (time we are waiting to a DNS resolving result
- char const* queuetimeout_attrib = xmlnode_get_attrib_ns(config, "queuetimeout", NULL);
- if (queuetimeout_attrib) {
- std::istringstream queuetimeout_stream(queuetimeout_attrib);
- queuetimeout_stream >> queue_timeout;
-
- // a timeout of less than 10 seconds does not seem to make sense
- if (queue_timeout < 10) {
- queue_timeout = 10;
- }
- } else {
- queue_timeout = 60;
- }
- set_heartbeat_interval(queue_timeout);
-
- // free temp resources
- xhash_free(namespaces);
- namespaces = NULL;
- }
-
- void resolver::send_query(xmppd::lwresc::lwquery const& query) {
- // get binary representation of the query
- std::ostringstream query_bin;
- query_bin << query;
-
- // send it
- mio_write(lwresd_socket, NULL, query_bin.str().c_str(), query_bin.str().length());
- }
-
- sigc::connection resolver::register_result_callback(uint32_t serial, sigc::signal<void, xmppd::lwresc::lwresult const&>::slot_type const& callback) {
- if (result_listeners.find(serial) == result_listeners.end()) {
- result_listeners[serial] = std::pair<time_t, sigc::signal<void, xmppd::lwresc::lwresult const&> >(std::time(NULL), sigc::signal<void, xmppd::lwresc::lwresult const&>());
- }
-
- return result_listeners[serial].second.connect(callback);
- }
-
- resend_service::resend_service(xmlnode resend) : weight_sum(0) {
- char const* service_attribute_value = xmlnode_get_attrib_ns(resend, "service", NULL);
-
- // if there is a service attribute, keep it
- if (service_attribute_value)
- service = service_attribute_value;
-
- // check, get and iterate partial childs
- xht namespaces = xhash_new(3);
- xhash_put(namespaces, "dnsrv", const_cast<char*>(NS_JABBERD_CONFIG_DNSRV));
- xmlnode_vector partial_elements = xmlnode_get_tags(resend, "dnsrv:partial", namespaces);
- xhash_free(namespaces);
- namespaces = NULL;
- for (xmlnode_vector::iterator p = partial_elements.begin(); p != partial_elements.end(); ++p) {
- // get the weight for this partial destination
- char const* weight_attrib = xmlnode_get_attrib_ns(*p, "weight", NULL);
- int weight = 1;
- if (weight_attrib) {
- std::istringstream weight_stream(weight_attrib);
- weight_stream >> weight;
- }
- if (weight < 1)
- weight = 1;
-
- // get the destination
- char const* resend_dest = xmlnode_get_data(*p);
- if (!resend_dest)
- continue;
- try {
- xmppd::jabberid resend_jid(resend_dest);
-
- // keep
- resend_hosts.push_back(std::pair<int, xmppd::jabberid>(weight, resend_jid));
- weight_sum += weight;
- } catch (std::invalid_argument) {
- continue;
- }
-
- }
-
- // if there where no partial childs, use the text() child of the <resend/> element
- if (resend_hosts.empty()) {
- try {
- char const* resend_dest = xmlnode_get_data(resend);
- if (resend_dest) {
- xmppd::jabberid resend_jid(resend_dest);
-
- // keep
- resend_hosts.push_back(std::pair<int, xmppd::jabberid>(1, resend_jid));
- weight_sum++;
- }
- } catch (std::invalid_argument) {
- }
- }
-
- // still no valid resend_hosts?
- if (resend_hosts.empty()) {
- throw std::invalid_argument("resend config contains no valid destination");
- }
- }
-
- bool resend_service::is_explicit_service() const {
- return service.length() > 0;
- }
-
- Glib::ustring const& resend_service::get_service_prefix() const {
- return service;
- }
-
- result resolver::on_stanza_packet(dpacket dp) {
- // sanity check
- if (!dp || !dp->host)
- return r_ERR;
-
- // check if the packet already has been resolved (in the case of looping packets)
- if (xmlnode_get_attrib_ns(dp->x, "ip", NULL) || xmlnode_get_attrib_ns(dp->x, "iperror", NULL)) {
- char const* packet_type = xmlnode_get_attrib_ns(dp->x, "type", NULL);
-
- // drop type='error', bounce everything else
- if (packet_type && Glib::ustring(packet_type) == "error") {
- log_warn(dp->host, "Looping DNS request. Dropped: %s", xmlnode_serialize_string(dp->x, xmppd::ns_decl_list(), 0));
- xmlnode_free(dp->x);
- } else {
- deliver_fail(dp, N_("Looping DNS request. Dropped."));
- }
-
- return r_DONE;
- }
-
- // is there already a resolve request pending for this domain? Just add to queue for this resolving
- if (pending_jobs.find(dp->host) != pending_jobs.end()) {
- pending_jobs[dp->host]->add_packet(dp);
- return r_DONE;
- }
-
- // store the packet, so that we can forward it when it has been resolved, and start resolving
- pending_jobs[dp->host] = new resolver_job(*this, dp);
- return r_DONE;
- }
-
- resolver_job::resolver_job(resolver& owner, dpacket dp) : owner(owner), waited_srv_serial(0) {
+ resolver_job::resolver_job(resolver& owner, dpacket dp) : owner(owner) {
// sanity check
if (!dp->host) {
throw std::invalid_argument("dpacket has no host");
@@ -294,7 +82,6 @@
xmppd::lwresc::rrsetbyname query(name_to_resolve.str(), ns_c_in, ns_t_srv);
// register result callback
- waited_srv_serial = query.getSerial();
connected_signals.push_back(owner.register_result_callback(query.getSerial(), sigc::mem_fun(*this, &xmppd::resolver::resolver_job::on_srv_query_result)));
// send query
@@ -311,15 +98,114 @@
}
}
- void resolver_job::resolve_providing_hosts() {
- // XXX implement this method
+ void resolver_job::on_a_query_result(xmppd::lwresc::lwresult const& result) {
+ // did we successfully get a result?
+ if (result.getResult() == xmppd::lwresc::lwresult::res_success) {
+ // we got a result, process it
+ try {
+ xmppd::lwresc::lwresult_rrset const* rrSet = dynamic_cast<xmppd::lwresc::lwresult_rrset const*>(result.getRData());
+ if (rrSet == NULL) {
+ return;
+ }
+
+ // get the returned records
+ std::vector<xmppd::lwresc::rrecord *> rrs = rrSet->getRR();
+
+ // iterate them to find SRV records
+ for (std::vector<xmppd::lwresc::rrecord *>::const_iterator p = rrs.begin(); p != rrs.end(); ++p) {
+ try {
+ xmppd::lwresc::a_record const* rr = dynamic_cast<xmppd::lwresc::a_record const*>(*p);
+
+ result_buffer << " " << rr->getAddress() << ":" << current_providing_host->second;
+ } catch (std::bad_cast) {
+ }
+ }
+
+ } catch (std::bad_cast) {
+ // we expected to get a lwresult_rrset on successfull resolving of our query
+ }
+
+ }
+
+ // resolve the next providing host
+ ++current_providing_host;
+
+ // resolve the now current host
+ resolve_current_providing_host();
}
- void resolver_job::on_srv_query_result(xmppd::lwresc::lwresult const& result) {
- // ignore all results we are not waiting for
- if (result.getSerial() != waited_srv_serial)
+ void resolver_job::resolve_current_providing_host_a() {
+ // create the query
+ xmppd::lwresc::rrsetbyname query(current_providing_host->first, ns_c_in, ns_t_a);
+
+ // register result callback
+ connected_signals.push_back(owner.register_result_callback(query.getSerial(), sigc::mem_fun(*this, &xmppd::resolver::resolver_job::on_a_query_result)));
+
+ // send query
+ owner.send_query(query);
+ }
+
+ void resolver_job::on_aaaa_query_result(xmppd::lwresc::lwresult const& result) {
+ // did we successfully get a result?
+ if (result.getResult() == xmppd::lwresc::lwresult::res_success) {
+ // we got a result, process it
+ try {
+ xmppd::lwresc::lwresult_rrset const* rrSet = dynamic_cast<xmppd::lwresc::lwresult_rrset const*>(result.getRData());
+ if (rrSet == NULL) {
+ return;
+ }
+
+ // get the returned records
+ std::vector<xmppd::lwresc::rrecord *> rrs = rrSet->getRR();
+
+ // iterate them to find SRV records
+ for (std::vector<xmppd::lwresc::rrecord *>::const_iterator p = rrs.begin(); p != rrs.end(); ++p) {
+ try {
+ xmppd::lwresc::aaaa_record const* rr = dynamic_cast<xmppd::lwresc::aaaa_record const*>(*p);
+
+ result_buffer << " [" << rr->getAddress() << "]:" << current_providing_host->second;
+ } catch (std::bad_cast) {
+ }
+ }
+
+ } catch (std::bad_cast) {
+ // we expected to get a lwresult_rrset on successfull resolving of our query
+ }
+ }
+
+ // try A lookup
+ resolve_current_providing_host_a();
+ return;
+ }
+
+ void resolver_job::resolve_current_providing_host() {
+ if (current_providing_host == providing_hosts.end()) {
+ // we resolved everything, notify listener
+ for (std::list<sigc::signal<void, resolver_job&> >::iterator p = result_listeners.begin(); p != result_listeners.end(); ++p) {
+ p->emit(*this);
+ }
return;
+ }
+
+ // create the query
+ xmppd::lwresc::rrsetbyname query(current_providing_host->first, ns_c_in, ns_t_aaaa);
+ // register result callback
+ connected_signals.push_back(owner.register_result_callback(query.getSerial(), sigc::mem_fun(*this, &xmppd::resolver::resolver_job::on_aaaa_query_result)));
+
+ // send query
+ owner.send_query(query);
+ }
+
+ void resolver_job::resolve_providing_hosts() {
+ // make the first one the current
+ current_providing_host = providing_hosts.begin();
+
+ // resolve the now current host
+ resolve_current_providing_host();
+ }
+
+ void resolver_job::on_srv_query_result(xmppd::lwresc::lwresult const& result) {
// did we successfully get a result?
if (result.getResult() != xmppd::lwresc::lwresult::res_success) {
// try next service
@@ -328,19 +214,74 @@
return;
}
- // XXX implement this method
+ // we got a result, process it
+ try {
+ xmppd::lwresc::lwresult_rrset const* rrSet = dynamic_cast<xmppd::lwresc::lwresult_rrset const*>(result.getRData());
+ if (rrSet == NULL) {
+ return;
+ }
+
+ // get the returned records
+ std::vector<xmppd::lwresc::rrecord *> rrs = rrSet->getRR();
+
+ // iterate them to find SRV records
+ bool found_srv_record = false;
+ for (std::vector<xmppd::lwresc::rrecord *>::const_iterator p = rrs.begin(); p != rrs.end(); ++p) {
+ try {
+ xmppd::lwresc::srv_record const* rr = dynamic_cast<xmppd::lwresc::srv_record const*>(*p);
+ found_srv_record = true;
+
+ // XXX the following line is only for debugging
+ std::cout << "One SRV result for " << destination << "/" << current_service->get_service_prefix() << " is: "
+ << rr->getPrio() << " " << rr->getWeight() << " " << rr->getDName() << ":" << rr->getPort() << std::endl;
+
+ // XXX we have to sort by priority and weight
+
+ // for now, add unsorted to the list
+ std::ostringstream port;
+ port << rr->getPort();
+ providing_hosts.push_back(std::pair<Glib::ustring, Glib::ustring>(rr->getDName(), port.str()));
+
+ } catch (std::bad_cast) {
+ // it hasn't been a SRV record - we can ignore it
+ }
+ }
+
+ // if we found something we have to resolve the providing hosts, else try next service
+ if (found_srv_record) {
+ // resolve the returned locations to IP addresses
+ resolve_providing_hosts();
+ } else {
+ // try next service
+ ++current_service;
+ start_resolving_service();
+ }
+
+ } catch (std::bad_cast) {
+ // we expected to get a lwresult_rrset on successfull resolving of our query
+ }
+ }
+
+ sigc::connection resolver_job::register_result_callback(sigc::signal<void, resolver_job&>::slot_type const& callback) {
+ sigc::signal<void, resolver_job&> new_signal = sigc::signal<void, resolver_job&>();
+ sigc::connection result = new_signal.connect(callback);
+ result_listeners.push_back(new_signal);
+ return result;
}
- }
-}
-/**
- * init and register the resolver component in the server
- *
- * @todo care for destructing the resolver instance on shutdown
- *
- * @param i the jabber server's data about this instance
- * @param x xmlnode of this instances configuration (???)
- */
-extern "C" void resolver(instance i, xmlnode x) {
- xmppd::resolver::resolver* ri = new xmppd::resolver::resolver(i, x);
+ Glib::ustring resolver_job::get_result() const {
+ std::string result = result_buffer.str();
+ if (result.length() < 1)
+ return result;
+ return result.substr(1);
+ }
+
+ std::list<dpacket> const& resolver_job::get_packets() const {
+ return waiting_packets;
+ }
+
+ xmppd::jabberid resolver_job::get_resend_host() const {
+ return current_service->get_resend_host();
+ }
+ }
}
More information about the dev
mailing list