[xmppd-dev] commit r1554 - in trunk/jabberd14: jabberd jabberd/lib resolver

mail at jabberd.org mail at jabberd.org
Tue Mar 17 01:09:35 CET 2009


Author: mawis
Date: Tue Mar 17 01:09:35 2009
New Revision: 1554

Log:
Work in progress on the new DNS resolver.
- still with bugs (I expect)
- expiring of queries not yet implemented
- correct ordering of result SRV records not yet implemented

Added:
   trunk/jabberd14/resolver/resend_service.cc   (contents, props changed)
      - copied, changed from r1539, trunk/jabberd14/resolver/resolver.cc
   trunk/jabberd14/resolver/resolver_job.cc   (contents, props changed)
      - copied, changed from r1539, trunk/jabberd14/resolver/resolver.cc
Modified:
   trunk/jabberd14/jabberd/deliver.cc
   trunk/jabberd14/jabberd/lib/jabberdlib.h
   trunk/jabberd14/jabberd/lib/lwresc.cc
   trunk/jabberd14/resolver/Makefile.am
   trunk/jabberd14/resolver/resolver.cc
   trunk/jabberd14/resolver/resolver.h

Modified: trunk/jabberd14/jabberd/deliver.cc
==============================================================================
--- trunk/jabberd14/jabberd/deliver.cc	Mon Mar 16 18:21:38 2009	(r1553)
+++ trunk/jabberd14/jabberd/deliver.cc	Tue Mar 17 01:09:35 2009	(r1554)
@@ -138,7 +138,6 @@
  */
 typedef struct deliver_mp_st {
     pth_message_t head;	/**< the standard pth message header */
-    instance i;		/**< the sending instance */
     dpacket p;		/**< the queued packet */
 } _deliver_msg,*deliver_msg;
 
@@ -653,7 +652,7 @@
  * deliver a ::dpacket to an ::instance using the configured XML routings
  *
  * @param p the packet that should be delivered (packet gets consumed)
- * @param i the instance of the sender (!) of the packet
+ * @param i unused/ignored (was: the instance of the sender (!) of the packet)
  */
 void deliver(dpacket p, instance i) {
     ilist a, b;
@@ -667,7 +666,7 @@
 	/* begin delivery of postponed messages */
         deliver_msg d;
         while ((d=(deliver_msg)pth_msgport_get(deliver__mp))!=NULL) {
-            deliver(d->p,d->i);
+            deliver(d->p, NULL);
         }
         pth_msgport_destroy(deliver__mp);
         deliver__mp = NULL;
@@ -691,7 +690,6 @@
         if (deliver__mp == NULL)
             deliver__mp = pth_msgport_create("deliver__");
         
-        d->i = i;
         d->p = p;
         
         pth_msgport_put(deliver__mp, reinterpret_cast<pth_message_t*>(d));

Modified: trunk/jabberd14/jabberd/lib/jabberdlib.h
==============================================================================
--- trunk/jabberd14/jabberd/lib/jabberdlib.h	Mon Mar 16 18:21:38 2009	(r1553)
+++ trunk/jabberd14/jabberd/lib/jabberdlib.h	Tue Mar 17 01:09:35 2009	(r1554)
@@ -1455,6 +1455,8 @@
 	};
 
 	class rrecord {
+	    public:
+		virtual ~rrecord();
 	};
 
 	class srv_record : public rrecord {

Modified: trunk/jabberd14/jabberd/lib/lwresc.cc
==============================================================================
--- trunk/jabberd14/jabberd/lib/lwresc.cc	Mon Mar 16 18:21:38 2009	(r1553)
+++ trunk/jabberd14/jabberd/lib/lwresc.cc	Tue Mar 17 01:09:35 2009	(r1554)
@@ -357,6 +357,9 @@
 	    }
 	}
 
+	rrecord::~rrecord() {
+	}
+
 	srv_record::srv_record(std::istream& is) {
 	    try {
 		uint16_t rrlen = lwresult::read_uint16(is);

Modified: trunk/jabberd14/resolver/Makefile.am
==============================================================================
--- trunk/jabberd14/resolver/Makefile.am	Mon Mar 16 18:21:38 2009	(r1553)
+++ trunk/jabberd14/resolver/Makefile.am	Tue Mar 17 01:09:35 2009	(r1554)
@@ -2,7 +2,7 @@
 
 noinst_HEADERS = resolver.h
 
-libjabberdresolver_la_SOURCES = resolver.cc
+libjabberdresolver_la_SOURCES = resend_service.cc  resolver.cc  resolver_job.cc
 libjabberdresolver_la_LIBADD = $(top_builddir)/jabberd/libjabberd.la
 libjabberdresolver_la_LDFLAGS = @LDFLAGS@ @VERSION_INFO@ -module -version-info 2:0:0
 

Copied and modified: trunk/jabberd14/resolver/resend_service.cc (from r1539, trunk/jabberd14/resolver/resolver.cc)
==============================================================================
--- trunk/jabberd14/resolver/resolver.cc	Wed Sep 10 02:35:57 2008	(r1539, copy source)
+++ trunk/jabberd14/resolver/resend_service.cc	Tue Mar 17 01:09:35 2009	(r1554)
@@ -1,7 +1,7 @@
 /*
  * Copyrights
  * 
- * Copyright (c) 2008 Matthias Wimmer
+ * Copyright (c) 2008/2009 Matthias Wimmer
  *
  * This file is part of jabberd14.
  *
@@ -33,119 +33,6 @@
 
 namespace xmppd {
     namespace resolver {
-	resolver::resolver(instance i, xmlnode x) : instance_base(i, x), lwresd_socket(NULL), queue_timeout(60), lwresd_host("localhost"), lwresd_service("921") {
-	    configurate();
-
-	    open_lwresd_socket();
-	}
-
-	void resolver::open_lwresd_socket() {
-	    int udp_socket = make_netsocket2(lwresd_service, lwresd_host, NETSOCKET_UDP);
-
-	    lwresd_socket = mio_new(udp_socket, mio_callback, this, MIO_CONNECT_RAW);
-	}
-
-	void resolver::mio_callback(mio m, int state, void* arg, xmlnode x, char* buffer, int bufsz) {
-	    // sanity check
-	    if (!arg) {
-		return;
-	    }
-
-	    // make everything a bit nicer and call mio_event
-	    static_cast<resolver*>(arg)->mio_event(m, state, buffer ? std::string(buffer, bufsz) : std::string());
-	}
-
-	void resolver::mio_event(mio m, int state, std::string const& buffer) {
-	    switch (state) {
-		case MIO_CLOSED:
-		    mio_event_closed(m);
-		    break;
-		case MIO_BUFFER:
-		    mio_event_buffer(m, buffer);
-		    break;
-		case MIO_ERROR:
-		default:
-		    mio_event_error(m);
-	    }
-	}
-
-	void resolver::mio_event_buffer(mio m, std::string const& buffer) {
-	    // parse the result
-	    std::istringstream buffer_stream(buffer);
-	    xmppd::lwresc::lwresult query_result(buffer_stream);
-
-	    // send the signal for this result
-	    uint32_t serial = query_result.getSerial();
-	    std::map<uint32_t, std::pair<time_t, sigc::signal<void, xmppd::lwresc::lwresult const&> > >::iterator result_listener = result_listeners.find(serial);
-	    if (result_listener != result_listeners.end()) {
-		result_listener->second.second.emit(query_result);
-		result_listeners.erase(result_listener);
-	    }
-
-	}
-
-	void resolver::mio_event_closed(mio m) {
-	}
-
-	void resolver::mio_event_error(mio m) {
-	}
-
-	std::list<resend_service> const& resolver::get_resend_services() {
-	    return resend_services;
-	}
-
-	void resolver::configurate() {
-	    // get the configuration
-	    xmlnode config = get_instance_config();
-
-	    // get and iterate the resend childs
-	    xht namespaces = xhash_new(3);
-	    xhash_put(namespaces, "dnsrv", const_cast<char*>(NS_JABBERD_CONFIG_DNSRV));
-	    xmlnode_vector resend_elements = xmlnode_get_tags(config, "dnsrv:resend", namespaces);
-	    for (xmlnode_vector::iterator p = resend_elements.begin(); p != resend_elements.end(); ++p) {
-		try {
-		    resend_services.push_back(resend_service(*p));
-		} catch (std::invalid_argument) {
-		}
-	    }
-
-	    // get the queue timeout (time we are waiting to a DNS resolving result
-	    char const* queuetimeout_attrib = xmlnode_get_attrib_ns(config, "queuetimeout", NULL);
-	    if (queuetimeout_attrib) {
-		std::istringstream queuetimeout_stream(queuetimeout_attrib);
-		queuetimeout_stream >> queue_timeout;
-
-		// a timeout of less than 10 seconds does not seem to make sense
-		if (queue_timeout < 10) {
-		    queue_timeout = 10;
-		}
-	    } else {
-		queue_timeout = 60;
-	    }
-	    set_heartbeat_interval(queue_timeout);
-
-	    // free temp resources
-	    xhash_free(namespaces);
-	    namespaces = NULL;
-	}
-
-	void resolver::send_query(xmppd::lwresc::lwquery const& query) {
-	    // get binary representation of the query
-	    std::ostringstream query_bin;
-	    query_bin << query;
-
-	    // send it
-	    mio_write(lwresd_socket, NULL, query_bin.str().c_str(), query_bin.str().length());
-	}
-
-	sigc::connection resolver::register_result_callback(uint32_t serial, sigc::signal<void, xmppd::lwresc::lwresult const&>::slot_type const& callback) {
-	    if (result_listeners.find(serial) == result_listeners.end()) {
-		result_listeners[serial] = std::pair<time_t, sigc::signal<void, xmppd::lwresc::lwresult const&> >(std::time(NULL), sigc::signal<void, xmppd::lwresc::lwresult const&>());
-	    }
-
-	    return result_listeners[serial].second.connect(callback);
-	}
-
 	resend_service::resend_service(xmlnode resend) : weight_sum(0) {
 	    char const* service_attribute_value = xmlnode_get_attrib_ns(resend, "service", NULL);
 
@@ -215,132 +102,18 @@
 	    return service;
 	}
 
-	result resolver::on_stanza_packet(dpacket dp) {
-	    // sanity check
-	    if (!dp || !dp->host)
-		return r_ERR;
-
-	    // check if the packet already has been resolved (in the case of looping packets)
-	    if (xmlnode_get_attrib_ns(dp->x, "ip", NULL) || xmlnode_get_attrib_ns(dp->x, "iperror", NULL)) {
-		char const* packet_type = xmlnode_get_attrib_ns(dp->x, "type", NULL);
-
-		// drop type='error', bounce everything else
-		if (packet_type && Glib::ustring(packet_type) == "error") {
-		    log_warn(dp->host, "Looping DNS request. Dropped: %s", xmlnode_serialize_string(dp->x, xmppd::ns_decl_list(), 0));
-		    xmlnode_free(dp->x);
-		} else {
-		    deliver_fail(dp, N_("Looping DNS request. Dropped."));
-		}
-
-		return r_DONE;
-	    }
-
-	    // is there already a resolve request pending for this domain? Just add to queue for this resolving
-	    if (pending_jobs.find(dp->host) != pending_jobs.end()) {
-		pending_jobs[dp->host]->add_packet(dp);
-		return r_DONE;
-	    }
-
-	    // store the packet, so that we can forward it when it has been resolved, and start resolving
-	    pending_jobs[dp->host] = new resolver_job(*this, dp);
-	    return r_DONE;
-	}
-
-	resolver_job::resolver_job(resolver& owner, dpacket dp) : owner(owner), waited_srv_serial(0) {
-	    // sanity check
-	    if (!dp->host) {
-		throw std::invalid_argument("dpacket has no host");
-	    }
-
-	    // keep the packet
-	    add_packet(dp);
-
-	    // keep destination explicitly for faster access
-	    destination = dp->host;
-
-	    // get the services and resend destinations that we have to use (make copy)
-	    resend_services = owner.get_resend_services();
-
-	    // set current service
-	    current_service = resend_services.begin();
-
-	    // start resolving this service
-	    start_resolving_service();
-	}
-
-	resolver_job::~resolver_job() {
-	    // disconnect all signals pointing to us
-	    for (std::list<sigc::connection>::iterator p = connected_signals.begin(); p != connected_signals.end(); ++p) {
-		p->disconnect();
-	    }
-	}
-
-	void resolver_job::add_packet(dpacket dp) {
-	    waiting_packets.push_back(dp);
-	}
-
-	void resolver_job::start_resolving_service() {
-	    // reset the list of providing hosts
-	    providing_hosts.erase(providing_hosts.begin(), providing_hosts.end());
-
-	    // do we have a service, or do have have do plain AAAA+A queries?
-	    if (current_service->is_explicit_service()) {
-		// need to do SRV lookup
-		//
-		// create query
-		std::ostringstream name_to_resolve;
-		name_to_resolve << std::string(current_service->get_service_prefix()) << "." << std::string(destination);
-
-		xmppd::lwresc::rrsetbyname query(name_to_resolve.str(), ns_c_in, ns_t_srv);
-
-		// register result callback
-		waited_srv_serial = query.getSerial();
-		connected_signals.push_back(owner.register_result_callback(query.getSerial(), sigc::mem_fun(*this, &xmppd::resolver::resolver_job::on_srv_query_result)));
-
-		// send query
-		owner.send_query(query);
-	    } else {
-		// no SRV lookup, just plain AAAA+A
-
-		// the hosts providing the service is just the destination on port 5269
-		// so put this in providing_hosts list
-		providing_hosts.push_back(std::pair<Glib::ustring, Glib::ustring>(destination, "5269"));
+	xmppd::jabberid resend_service::get_resend_host() const {
+	    int host_die = std::rand() % weight_sum;
 
-		// no real SRV lookup step needed, so directly start the AAAA+A query stap
-		resolve_providing_hosts();
-	    }
-	}
+	    for (std::list< std::pair<int, xmppd::jabberid> >::const_iterator p = resend_hosts.begin(); p != resend_hosts.end(); ++p) {
+		host_die -= p->first;
 
-	void resolver_job::resolve_providing_hosts() {
-	    // XXX implement this method
-	}
-
-	void resolver_job::on_srv_query_result(xmppd::lwresc::lwresult const& result) {
-	    // ignore all results we are not waiting for
-	    if (result.getSerial() != waited_srv_serial)
-		return;
-
-	    // did we successfully get a result?
-	    if (result.getResult() != xmppd::lwresc::lwresult::res_success) {
-		// try next service
-		++current_service;
-		start_resolving_service();
-		return;
+		if (host_die <= 0) {
+		    return p->second;
+		}
 	    }
 
-	    // XXX implement this method
+	    return resend_hosts.begin()->second;
 	}
     }
 }
-
-/**
- * init and register the resolver component in the server
- *
- * @todo care for destructing the resolver instance on shutdown
- *
- * @param i the jabber server's data about this instance
- * @param x xmlnode of this instances configuration (???)
- */
-extern "C" void resolver(instance i, xmlnode x) {
-    xmppd::resolver::resolver* ri = new xmppd::resolver::resolver(i, x);
-}

Modified: trunk/jabberd14/resolver/resolver.cc
==============================================================================
--- trunk/jabberd14/resolver/resolver.cc	Mon Mar 16 18:21:38 2009	(r1553)
+++ trunk/jabberd14/resolver/resolver.cc	Tue Mar 17 01:09:35 2009	(r1554)
@@ -1,7 +1,7 @@
 /*
  * Copyrights
  * 
- * Copyright (c) 2008 Matthias Wimmer
+ * Copyright (c) 2008/2009 Matthias Wimmer
  *
  * This file is part of jabberd14.
  *
@@ -146,75 +146,6 @@
 	    return result_listeners[serial].second.connect(callback);
 	}
 
-	resend_service::resend_service(xmlnode resend) : weight_sum(0) {
-	    char const* service_attribute_value = xmlnode_get_attrib_ns(resend, "service", NULL);
-
-	    // if there is a service attribute, keep it
-	    if (service_attribute_value)
-		service = service_attribute_value;
-
-	    // check, get and iterate partial childs
-	    xht namespaces = xhash_new(3);
-	    xhash_put(namespaces, "dnsrv", const_cast<char*>(NS_JABBERD_CONFIG_DNSRV));
-	    xmlnode_vector partial_elements = xmlnode_get_tags(resend, "dnsrv:partial", namespaces);
-	    xhash_free(namespaces);
-	    namespaces = NULL;
-	    for (xmlnode_vector::iterator p = partial_elements.begin(); p != partial_elements.end(); ++p) {
-		// get the weight for this partial destination
-		char const* weight_attrib = xmlnode_get_attrib_ns(*p, "weight", NULL);
-		int weight = 1;
-		if (weight_attrib) {
-		    std::istringstream weight_stream(weight_attrib);
-		    weight_stream >> weight;
-		}
-		if (weight < 1)
-		    weight = 1;
-
-		// get the destination
-		char const* resend_dest = xmlnode_get_data(*p);
-		if (!resend_dest)
-		    continue;
-		try {
-		    xmppd::jabberid resend_jid(resend_dest);
-
-		    // keep
-		    resend_hosts.push_back(std::pair<int, xmppd::jabberid>(weight, resend_jid));
-		    weight_sum += weight;
-		} catch (std::invalid_argument) {
-		    continue;
-		}
-
-	    }
-
-	    // if there where no partial childs, use the text() child of the <resend/> element
-	    if (resend_hosts.empty()) {
-		try {
-		    char const* resend_dest = xmlnode_get_data(resend);
-		    if (resend_dest) {
-			xmppd::jabberid resend_jid(resend_dest);
-
-			// keep
-			resend_hosts.push_back(std::pair<int, xmppd::jabberid>(1, resend_jid));
-			weight_sum++;
-		    }
-		} catch (std::invalid_argument) {
-		}
-	    }
-
-	    // still no valid resend_hosts?
-	    if (resend_hosts.empty()) {
-		throw std::invalid_argument("resend config contains no valid destination");
-	    }
-	}
-
-	bool resend_service::is_explicit_service() const {
-	    return service.length() > 0;
-	}
-
-	Glib::ustring const& resend_service::get_service_prefix() const {
-	    return service;
-	}
-
 	result resolver::on_stanza_packet(dpacket dp) {
 	    // sanity check
 	    if (!dp || !dp->host)
@@ -243,92 +174,44 @@
 
 	    // store the packet, so that we can forward it when it has been resolved, and start resolving
 	    pending_jobs[dp->host] = new resolver_job(*this, dp);
+	    pending_jobs[dp->host]->register_result_callback(sigc::mem_fun(*this, &xmppd::resolver::resolver::handle_completed_job));
 	    return r_DONE;
 	}
 
-	resolver_job::resolver_job(resolver& owner, dpacket dp) : owner(owner), waited_srv_serial(0) {
-	    // sanity check
-	    if (!dp->host) {
-		throw std::invalid_argument("dpacket has no host");
-	    }
-
-	    // keep the packet
-	    add_packet(dp);
-
-	    // keep destination explicitly for faster access
-	    destination = dp->host;
-
-	    // get the services and resend destinations that we have to use (make copy)
-	    resend_services = owner.get_resend_services();
-
-	    // set current service
-	    current_service = resend_services.begin();
-
-	    // start resolving this service
-	    start_resolving_service();
-	}
-
-	resolver_job::~resolver_job() {
-	    // disconnect all signals pointing to us
-	    for (std::list<sigc::connection>::iterator p = connected_signals.begin(); p != connected_signals.end(); ++p) {
-		p->disconnect();
-	    }
-	}
-
-	void resolver_job::add_packet(dpacket dp) {
-	    waiting_packets.push_back(dp);
-	}
-
-	void resolver_job::start_resolving_service() {
-	    // reset the list of providing hosts
-	    providing_hosts.erase(providing_hosts.begin(), providing_hosts.end());
-
-	    // do we have a service, or do have have do plain AAAA+A queries?
-	    if (current_service->is_explicit_service()) {
-		// need to do SRV lookup
-		//
-		// create query
-		std::ostringstream name_to_resolve;
-		name_to_resolve << std::string(current_service->get_service_prefix()) << "." << std::string(destination);
-
-		xmppd::lwresc::rrsetbyname query(name_to_resolve.str(), ns_c_in, ns_t_srv);
-
-		// register result callback
-		waited_srv_serial = query.getSerial();
-		connected_signals.push_back(owner.register_result_callback(query.getSerial(), sigc::mem_fun(*this, &xmppd::resolver::resolver_job::on_srv_query_result)));
-
-		// send query
-		owner.send_query(query);
+	void resolver::resend_packet(xmlnode pkt, Glib::ustring ips, Glib::ustring to) {
+	    if (ips.empty()) {
+		jutil_error_xmpp(pkt, (xterror){502, N_("Unable to resolve hostname."), "wait", "service-unavailable"});
+		xmlnode_put_attrib_ns(pkt, "iperror", NULL, NULL, "");
 	    } else {
-		// no SRV lookup, just plain AAAA+A
-
-		// the hosts providing the service is just the destination on port 5269
-		// so put this in providing_hosts list
-		providing_hosts.push_back(std::pair<Glib::ustring, Glib::ustring>(destination, "5269"));
+		char const* dnsresultto = xmlnode_get_attrib_ns(pkt, "dnsqueryby", NULL);
+		if (!dnsresultto) {
+		    dnsresultto = to.c_str();
+		}
 
-		// no real SRV lookup step needed, so directly start the AAAA+A query stap
-		resolve_providing_hosts();
+		pkt = xmlnode_wrap_ns(pkt, "route", NULL, NULL);
+		xmlnode_put_attrib_ns(pkt, "to", NULL, NULL, dnsresultto);
+		xmlnode_put_attrib_ns(pkt, "ip", NULL, NULL, ips.c_str());
 	    }
+	    deliver(pkt);
 	}
 
-	void resolver_job::resolve_providing_hosts() {
-	    // XXX implement this method
-	}
+	void resolver::handle_completed_job(resolver_job& job) {
+	    // get the packets
+	    std::list<dpacket> const& packets = job.get_packets();
 
-	void resolver_job::on_srv_query_result(xmppd::lwresc::lwresult const& result) {
-	    // ignore all results we are not waiting for
-	    if (result.getSerial() != waited_srv_serial)
-		return;
+	    // get the resend destination
+	    Glib::ustring resend_host = job.get_resend_host().full();
 
-	    // did we successfully get a result?
-	    if (result.getResult() != xmppd::lwresc::lwresult::res_success) {
-		// try next service
-		++current_service;
-		start_resolving_service();
-		return;
+	    // get the resolved ips
+	    Glib::ustring ips = job.get_result();
+
+	    // resend the packets
+	    for (std::list<dpacket>::const_iterator p = packets.begin(); p != packets.end(); ++p) {
+		resend_packet((*p)->x, ips, resend_host);
 	    }
 
-	    // XXX implement this method
+	    // we are done with it, we can delete the job
+	    delete &job;
 	}
     }
 }

Modified: trunk/jabberd14/resolver/resolver.h
==============================================================================
--- trunk/jabberd14/resolver/resolver.h	Mon Mar 16 18:21:38 2009	(r1553)
+++ trunk/jabberd14/resolver/resolver.h	Tue Mar 17 01:09:35 2009	(r1554)
@@ -56,6 +56,12 @@
 		 */
 		Glib::ustring const& get_service_prefix() const;
 
+		/**
+		 * select one of the resend_hosts for this service
+		 *
+		 * @return choosen resend_host
+		 */
+		xmppd::jabberid get_resend_host() const;
 	    private:
 		/**
 		 * the service to resolve, empty string for no service but plain AAAA/A lookups
@@ -107,6 +113,36 @@
 		 * @throws std::invalid_argument if the packet has a different destination, that what is being resolved by this job
 		 */
 		void add_packet(dpacket dp);
+
+		/**
+		 * get the waiting packets
+		 *
+		 * @return list of waiting packets
+		 */
+		std::list<dpacket> const& get_packets() const;
+
+		/**
+		 * register for being notified on finishing the job
+		 *
+		 * @param callback what should get notified on finishing the job
+		 */
+		sigc::connection register_result_callback(sigc::signal<void, resolver_job&>::slot_type const& callback);
+
+		/**
+		 * get the resolving result
+		 *
+		 * @return string containing IP/port pairs as the result to connect to
+		 */
+		Glib::ustring get_result() const;
+
+		/**
+		 * get the service where to send packets to that have been resolved by this job
+		 *
+		 * @return the service
+		 */
+		xmppd::jabberid get_resend_host() const;
+
+
 	    private:
 		/**
 		 * the destination, that is being resolved by this job
@@ -150,6 +186,11 @@
 		std::list< std::pair<Glib::ustring, Glib::ustring> > providing_hosts;
 
 		/**
+		 * the providing host, that is currently resolved
+		 */
+		std::list< std::pair<Glib::ustring, Glib::ustring> >::const_iterator current_providing_host;
+
+		/**
 		 * do AAAA and A lookups for the hosts in providing_hosts
 		 *
 		 * This gets called after providing_hosts has been filled, either explicitly
@@ -159,6 +200,18 @@
 		void resolve_providing_hosts();
 
 		/**
+		 * do AAAA and A lookups for the current_current_providing host
+		 *
+		 * This get called while iterating the entries in providing_hosts after updating current_providing_host
+		 */
+		void resolve_current_providing_host();
+
+		/**
+		 * do the remaining A lookup after the AAAA lookup has been done for the current_current_providing host
+		 */
+		void resolve_current_providing_host_a();
+
+		/**
 		 * list of signals to disconnect on destruction
 		 */
 		std::list<sigc::connection> connected_signals;
@@ -169,9 +222,24 @@
 		void on_srv_query_result(xmppd::lwresc::lwresult const& result);
 
 		/**
-		 * the serial for the SRV query that we have sent last
+		 * handle results of AAAA queries to the DNS
+		 */
+		void on_aaaa_query_result(xmppd::lwresc::lwresult const& result);
+
+		/**
+		 * handle results of A queries to the DNS
+		 */
+		void on_a_query_result(xmppd::lwresc::lwresult const& result);
+
+		/**
+		 * buffer taking the result of resolving the job
 		 */
-		uint32_t waited_srv_serial;
+		std::ostringstream result_buffer;
+
+		/**
+		 * listeners to get notified if the resolver_job is finished
+		 */
+		std::list<sigc::signal<void, resolver_job&> > result_listeners;
 	};
 
 	/**
@@ -212,6 +280,24 @@
 
 	    private:
 		/**
+		 * resend a resolved packet to the configured service
+		 *
+		 * the packet may overwrite the destination where it wants to get resent to
+		 *
+		 * @param pkt the packet to resend
+		 * @param ips the resolving result to add to the packet
+		 * @param to the service to send the packet to (if not overwritten by the pkt itself)
+		 */
+		void resend_packet(xmlnode pkt, Glib::ustring ips, Glib::ustring to);
+
+		/**
+		 * handles completed resolvings
+		 *
+		 * @param job the resolver_job that completed
+		 */
+		void handle_completed_job(resolver_job& job);
+
+		/**
 		 * handle received stanzas
 		 *
 		 * @param dp the stanza to handle

Copied and modified: trunk/jabberd14/resolver/resolver_job.cc (from r1539, trunk/jabberd14/resolver/resolver.cc)
==============================================================================
--- trunk/jabberd14/resolver/resolver.cc	Wed Sep 10 02:35:57 2008	(r1539, copy source)
+++ trunk/jabberd14/resolver/resolver_job.cc	Tue Mar 17 01:09:35 2009	(r1554)
@@ -1,7 +1,7 @@
 /*
  * Copyrights
  * 
- * Copyright (c) 2008 Matthias Wimmer
+ * Copyright (c) 2008/2009 Matthias Wimmer
  *
  * This file is part of jabberd14.
  *
@@ -30,223 +30,11 @@
  */
 
 #include <resolver.h>
+#include <iostream>
 
 namespace xmppd {
     namespace resolver {
-	resolver::resolver(instance i, xmlnode x) : instance_base(i, x), lwresd_socket(NULL), queue_timeout(60), lwresd_host("localhost"), lwresd_service("921") {
-	    configurate();
-
-	    open_lwresd_socket();
-	}
-
-	void resolver::open_lwresd_socket() {
-	    int udp_socket = make_netsocket2(lwresd_service, lwresd_host, NETSOCKET_UDP);
-
-	    lwresd_socket = mio_new(udp_socket, mio_callback, this, MIO_CONNECT_RAW);
-	}
-
-	void resolver::mio_callback(mio m, int state, void* arg, xmlnode x, char* buffer, int bufsz) {
-	    // sanity check
-	    if (!arg) {
-		return;
-	    }
-
-	    // make everything a bit nicer and call mio_event
-	    static_cast<resolver*>(arg)->mio_event(m, state, buffer ? std::string(buffer, bufsz) : std::string());
-	}
-
-	void resolver::mio_event(mio m, int state, std::string const& buffer) {
-	    switch (state) {
-		case MIO_CLOSED:
-		    mio_event_closed(m);
-		    break;
-		case MIO_BUFFER:
-		    mio_event_buffer(m, buffer);
-		    break;
-		case MIO_ERROR:
-		default:
-		    mio_event_error(m);
-	    }
-	}
-
-	void resolver::mio_event_buffer(mio m, std::string const& buffer) {
-	    // parse the result
-	    std::istringstream buffer_stream(buffer);
-	    xmppd::lwresc::lwresult query_result(buffer_stream);
-
-	    // send the signal for this result
-	    uint32_t serial = query_result.getSerial();
-	    std::map<uint32_t, std::pair<time_t, sigc::signal<void, xmppd::lwresc::lwresult const&> > >::iterator result_listener = result_listeners.find(serial);
-	    if (result_listener != result_listeners.end()) {
-		result_listener->second.second.emit(query_result);
-		result_listeners.erase(result_listener);
-	    }
-
-	}
-
-	void resolver::mio_event_closed(mio m) {
-	}
-
-	void resolver::mio_event_error(mio m) {
-	}
-
-	std::list<resend_service> const& resolver::get_resend_services() {
-	    return resend_services;
-	}
-
-	void resolver::configurate() {
-	    // get the configuration
-	    xmlnode config = get_instance_config();
-
-	    // get and iterate the resend childs
-	    xht namespaces = xhash_new(3);
-	    xhash_put(namespaces, "dnsrv", const_cast<char*>(NS_JABBERD_CONFIG_DNSRV));
-	    xmlnode_vector resend_elements = xmlnode_get_tags(config, "dnsrv:resend", namespaces);
-	    for (xmlnode_vector::iterator p = resend_elements.begin(); p != resend_elements.end(); ++p) {
-		try {
-		    resend_services.push_back(resend_service(*p));
-		} catch (std::invalid_argument) {
-		}
-	    }
-
-	    // get the queue timeout (time we are waiting to a DNS resolving result
-	    char const* queuetimeout_attrib = xmlnode_get_attrib_ns(config, "queuetimeout", NULL);
-	    if (queuetimeout_attrib) {
-		std::istringstream queuetimeout_stream(queuetimeout_attrib);
-		queuetimeout_stream >> queue_timeout;
-
-		// a timeout of less than 10 seconds does not seem to make sense
-		if (queue_timeout < 10) {
-		    queue_timeout = 10;
-		}
-	    } else {
-		queue_timeout = 60;
-	    }
-	    set_heartbeat_interval(queue_timeout);
-
-	    // free temp resources
-	    xhash_free(namespaces);
-	    namespaces = NULL;
-	}
-
-	void resolver::send_query(xmppd::lwresc::lwquery const& query) {
-	    // get binary representation of the query
-	    std::ostringstream query_bin;
-	    query_bin << query;
-
-	    // send it
-	    mio_write(lwresd_socket, NULL, query_bin.str().c_str(), query_bin.str().length());
-	}
-
-	sigc::connection resolver::register_result_callback(uint32_t serial, sigc::signal<void, xmppd::lwresc::lwresult const&>::slot_type const& callback) {
-	    if (result_listeners.find(serial) == result_listeners.end()) {
-		result_listeners[serial] = std::pair<time_t, sigc::signal<void, xmppd::lwresc::lwresult const&> >(std::time(NULL), sigc::signal<void, xmppd::lwresc::lwresult const&>());
-	    }
-
-	    return result_listeners[serial].second.connect(callback);
-	}
-
-	resend_service::resend_service(xmlnode resend) : weight_sum(0) {
-	    char const* service_attribute_value = xmlnode_get_attrib_ns(resend, "service", NULL);
-
-	    // if there is a service attribute, keep it
-	    if (service_attribute_value)
-		service = service_attribute_value;
-
-	    // check, get and iterate partial childs
-	    xht namespaces = xhash_new(3);
-	    xhash_put(namespaces, "dnsrv", const_cast<char*>(NS_JABBERD_CONFIG_DNSRV));
-	    xmlnode_vector partial_elements = xmlnode_get_tags(resend, "dnsrv:partial", namespaces);
-	    xhash_free(namespaces);
-	    namespaces = NULL;
-	    for (xmlnode_vector::iterator p = partial_elements.begin(); p != partial_elements.end(); ++p) {
-		// get the weight for this partial destination
-		char const* weight_attrib = xmlnode_get_attrib_ns(*p, "weight", NULL);
-		int weight = 1;
-		if (weight_attrib) {
-		    std::istringstream weight_stream(weight_attrib);
-		    weight_stream >> weight;
-		}
-		if (weight < 1)
-		    weight = 1;
-
-		// get the destination
-		char const* resend_dest = xmlnode_get_data(*p);
-		if (!resend_dest)
-		    continue;
-		try {
-		    xmppd::jabberid resend_jid(resend_dest);
-
-		    // keep
-		    resend_hosts.push_back(std::pair<int, xmppd::jabberid>(weight, resend_jid));
-		    weight_sum += weight;
-		} catch (std::invalid_argument) {
-		    continue;
-		}
-
-	    }
-
-	    // if there where no partial childs, use the text() child of the <resend/> element
-	    if (resend_hosts.empty()) {
-		try {
-		    char const* resend_dest = xmlnode_get_data(resend);
-		    if (resend_dest) {
-			xmppd::jabberid resend_jid(resend_dest);
-
-			// keep
-			resend_hosts.push_back(std::pair<int, xmppd::jabberid>(1, resend_jid));
-			weight_sum++;
-		    }
-		} catch (std::invalid_argument) {
-		}
-	    }
-
-	    // still no valid resend_hosts?
-	    if (resend_hosts.empty()) {
-		throw std::invalid_argument("resend config contains no valid destination");
-	    }
-	}
-
-	bool resend_service::is_explicit_service() const {
-	    return service.length() > 0;
-	}
-
-	Glib::ustring const& resend_service::get_service_prefix() const {
-	    return service;
-	}
-
-	result resolver::on_stanza_packet(dpacket dp) {
-	    // sanity check
-	    if (!dp || !dp->host)
-		return r_ERR;
-
-	    // check if the packet already has been resolved (in the case of looping packets)
-	    if (xmlnode_get_attrib_ns(dp->x, "ip", NULL) || xmlnode_get_attrib_ns(dp->x, "iperror", NULL)) {
-		char const* packet_type = xmlnode_get_attrib_ns(dp->x, "type", NULL);
-
-		// drop type='error', bounce everything else
-		if (packet_type && Glib::ustring(packet_type) == "error") {
-		    log_warn(dp->host, "Looping DNS request. Dropped: %s", xmlnode_serialize_string(dp->x, xmppd::ns_decl_list(), 0));
-		    xmlnode_free(dp->x);
-		} else {
-		    deliver_fail(dp, N_("Looping DNS request. Dropped."));
-		}
-
-		return r_DONE;
-	    }
-
-	    // is there already a resolve request pending for this domain? Just add to queue for this resolving
-	    if (pending_jobs.find(dp->host) != pending_jobs.end()) {
-		pending_jobs[dp->host]->add_packet(dp);
-		return r_DONE;
-	    }
-
-	    // store the packet, so that we can forward it when it has been resolved, and start resolving
-	    pending_jobs[dp->host] = new resolver_job(*this, dp);
-	    return r_DONE;
-	}
-
-	resolver_job::resolver_job(resolver& owner, dpacket dp) : owner(owner), waited_srv_serial(0) {
+	resolver_job::resolver_job(resolver& owner, dpacket dp) : owner(owner) {
 	    // sanity check
 	    if (!dp->host) {
 		throw std::invalid_argument("dpacket has no host");
@@ -294,7 +82,6 @@
 		xmppd::lwresc::rrsetbyname query(name_to_resolve.str(), ns_c_in, ns_t_srv);
 
 		// register result callback
-		waited_srv_serial = query.getSerial();
 		connected_signals.push_back(owner.register_result_callback(query.getSerial(), sigc::mem_fun(*this, &xmppd::resolver::resolver_job::on_srv_query_result)));
 
 		// send query
@@ -311,15 +98,114 @@
 	    }
 	}
 
-	void resolver_job::resolve_providing_hosts() {
-	    // XXX implement this method
+	void resolver_job::on_a_query_result(xmppd::lwresc::lwresult const& result) {
+	    // did we successfully get a result?
+	    if (result.getResult() == xmppd::lwresc::lwresult::res_success) {
+		// we got a result, process it
+		try {
+		    xmppd::lwresc::lwresult_rrset const* rrSet = dynamic_cast<xmppd::lwresc::lwresult_rrset const*>(result.getRData());
+		    if (rrSet == NULL) {
+			return;
+		    }
+
+		    // get the returned records
+		    std::vector<xmppd::lwresc::rrecord *> rrs = rrSet->getRR();
+
+		    // iterate them to find SRV records
+		    for (std::vector<xmppd::lwresc::rrecord *>::const_iterator p = rrs.begin(); p != rrs.end(); ++p) {
+			try {
+			    xmppd::lwresc::a_record const* rr = dynamic_cast<xmppd::lwresc::a_record const*>(*p);
+
+			    result_buffer << " " << rr->getAddress() << ":" << current_providing_host->second;
+			} catch (std::bad_cast) {
+			}
+		    }
+
+		} catch (std::bad_cast) {
+		    // we expected to get a lwresult_rrset on successfull resolving of our query
+		}
+
+	    }
+
+	    // resolve the next providing host
+	    ++current_providing_host;
+
+	    // resolve the now current host
+	    resolve_current_providing_host();
 	}
 
-	void resolver_job::on_srv_query_result(xmppd::lwresc::lwresult const& result) {
-	    // ignore all results we are not waiting for
-	    if (result.getSerial() != waited_srv_serial)
+	void resolver_job::resolve_current_providing_host_a() {
+	    // create the query
+	    xmppd::lwresc::rrsetbyname query(current_providing_host->first, ns_c_in, ns_t_a);
+
+	    // register result callback
+	    connected_signals.push_back(owner.register_result_callback(query.getSerial(), sigc::mem_fun(*this, &xmppd::resolver::resolver_job::on_a_query_result)));
+	    
+	    // send query
+	    owner.send_query(query);
+	}
+
+	void resolver_job::on_aaaa_query_result(xmppd::lwresc::lwresult const& result) {
+	    // did we successfully get a result?
+	    if (result.getResult() == xmppd::lwresc::lwresult::res_success) {
+		// we got a result, process it
+		try {
+		    xmppd::lwresc::lwresult_rrset const* rrSet = dynamic_cast<xmppd::lwresc::lwresult_rrset const*>(result.getRData());
+		    if (rrSet == NULL) {
+			return;
+		    }
+
+		    // get the returned records
+		    std::vector<xmppd::lwresc::rrecord *> rrs = rrSet->getRR();
+
+		    // iterate them to find SRV records
+		    for (std::vector<xmppd::lwresc::rrecord *>::const_iterator p = rrs.begin(); p != rrs.end(); ++p) {
+			try {
+			    xmppd::lwresc::aaaa_record const* rr = dynamic_cast<xmppd::lwresc::aaaa_record const*>(*p);
+
+			    result_buffer << " [" << rr->getAddress() << "]:" << current_providing_host->second;
+			} catch (std::bad_cast) {
+			}
+		    }
+
+		} catch (std::bad_cast) {
+		    // we expected to get a lwresult_rrset on successfull resolving of our query
+		}
+	    }
+
+	    // try A lookup
+	    resolve_current_providing_host_a();
+	    return;
+	}
+
+	void resolver_job::resolve_current_providing_host() {
+	    if (current_providing_host == providing_hosts.end()) {
+		// we resolved everything, notify listener
+		for (std::list<sigc::signal<void, resolver_job&> >::iterator p = result_listeners.begin(); p != result_listeners.end(); ++p) {
+		    p->emit(*this);
+		}
 		return;
+	    }
+
+	    // create the query
+	    xmppd::lwresc::rrsetbyname query(current_providing_host->first, ns_c_in, ns_t_aaaa);
 
+	    // register result callback
+	    connected_signals.push_back(owner.register_result_callback(query.getSerial(), sigc::mem_fun(*this, &xmppd::resolver::resolver_job::on_aaaa_query_result)));
+	    
+	    // send query
+	    owner.send_query(query);
+	}
+
+	void resolver_job::resolve_providing_hosts() {
+	    // make the first one the current
+	    current_providing_host = providing_hosts.begin();
+
+	    // resolve the now current host
+	    resolve_current_providing_host();
+	}
+
+	void resolver_job::on_srv_query_result(xmppd::lwresc::lwresult const& result) {
 	    // did we successfully get a result?
 	    if (result.getResult() != xmppd::lwresc::lwresult::res_success) {
 		// try next service
@@ -328,19 +214,74 @@
 		return;
 	    }
 
-	    // XXX implement this method
+	    // we got a result, process it
+	    try {
+		xmppd::lwresc::lwresult_rrset const* rrSet = dynamic_cast<xmppd::lwresc::lwresult_rrset const*>(result.getRData());
+		if (rrSet == NULL) {
+		    return;
+		}
+
+		// get the returned records
+		std::vector<xmppd::lwresc::rrecord *> rrs = rrSet->getRR();
+
+		// iterate them to find SRV records
+		bool found_srv_record = false;
+		for (std::vector<xmppd::lwresc::rrecord *>::const_iterator p = rrs.begin(); p != rrs.end(); ++p) {
+		    try {
+			xmppd::lwresc::srv_record const* rr = dynamic_cast<xmppd::lwresc::srv_record const*>(*p);
+			found_srv_record = true;
+
+			// XXX the following line is only for debugging
+			std::cout << "One SRV result for " << destination << "/" << current_service->get_service_prefix() << " is: "
+			    << rr->getPrio() << " " << rr->getWeight() << " " << rr->getDName() << ":" << rr->getPort() << std::endl;
+
+			// XXX we have to sort by priority and weight
+
+			// for now, add unsorted to the list
+			std::ostringstream port;
+			port << rr->getPort();
+			providing_hosts.push_back(std::pair<Glib::ustring, Glib::ustring>(rr->getDName(), port.str()));
+
+		    } catch (std::bad_cast) {
+			// it hasn't been a SRV record - we can ignore it
+		    }
+		}
+
+		// if we found something we have to resolve the providing hosts, else try next service
+		if (found_srv_record) {
+		    // resolve the returned locations to IP addresses
+		    resolve_providing_hosts();
+		} else {
+		    // try next service
+		    ++current_service;
+		    start_resolving_service();
+		}
+
+	    } catch (std::bad_cast) {
+		// we expected to get a lwresult_rrset on successfull resolving of our query
+	    }
+	}
+
+	sigc::connection resolver_job::register_result_callback(sigc::signal<void, resolver_job&>::slot_type const& callback) {
+	    sigc::signal<void, resolver_job&> new_signal = sigc::signal<void, resolver_job&>();
+	    sigc::connection result = new_signal.connect(callback);
+	    result_listeners.push_back(new_signal);
+	    return result;
 	}
-    }
-}
 
-/**
- * init and register the resolver component in the server
- *
- * @todo care for destructing the resolver instance on shutdown
- *
- * @param i the jabber server's data about this instance
- * @param x xmlnode of this instances configuration (???)
- */
-extern "C" void resolver(instance i, xmlnode x) {
-    xmppd::resolver::resolver* ri = new xmppd::resolver::resolver(i, x);
+	Glib::ustring resolver_job::get_result() const {
+	    std::string result = result_buffer.str();
+	    if (result.length() < 1)
+		return result;
+	    return result.substr(1);
+	}
+
+	std::list<dpacket> const& resolver_job::get_packets() const {
+	    return waiting_packets;
+	}
+
+	xmppd::jabberid resolver_job::get_resend_host() const {
+	    return current_service->get_resend_host();
+	}
+    }
 }


More information about the dev mailing list