/* * Copyright (c) 1999 Apple Computer, Inc. All rights reserved. * * @APPLE_LICENSE_HEADER_START@ * * "Portions Copyright (c) 1999 Apple Computer, Inc. All Rights * Reserved. This file contains Original Code and/or Modifications of * Original Code as defined in and that are subject to the Apple Public * Source License Version 1.0 (the 'License'). You may not use this file * except in compliance with the License. Please obtain a copy of the * License at http://www.apple.com/publicsource and read it before using * this file. * * The Original Code and all software distributed under the License are * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES, * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE OR NON-INFRINGEMENT. Please see the * License for the specific language governing rights and limitations * under the License." * * @APPLE_LICENSE_HEADER_END@ */ /* * Notification thread routines. * Copyright (C) 1989 by NeXT, Inc. * * The notification thread runs only on the master. It notifies * clone servers about changes to the database or resynchronization * requests from the master. */ #include #include "ni_server.h" #include #include #include #include #include #include #include "ni_globals.h" #include #include "getstuff.h" #include "ni_notify.h" #include #include #include #include #include const char *procname(int); const bool_t takes_arg(int); unsigned int clone_count = 0; #define NI_SEPARATOR '/' /* Separator used in NetInfo property values */ #define NOTIFY_TIMEOUT 60 /* Time to wait before giving up on clone */ /* * The notification thread knows about the clone servers through * this data structure. */ typedef struct client_node *clone_list; struct client_node { ni_name name; /* host name of clone */ unsigned long addr; /* IP address of clone - in network byte order*/ ni_name tag; /* Which database served by clone (by tag) */ syslock *lock; /* Try only one update per clone */ long lastupdate; /* Serialize updates */ bool_t up_to_date; /* Clone's up-to-date due to readall */ clone_list next; /* next clone */ int retain; /* ref count (only valid in first node) */ }; #define NO_SEQUENCE_NUMBER -1 /* * This union encodes all possible arguments involved in NetInfo write * calls. */ typedef union write_any_args { ni_create_args create_args; ni_destroy_args destroy_args; ni_proplist_stuff write_args; ni_createprop_args createprop_args; ni_prop_args destroyprop_args; ni_propname_args renameprop_args; ni_writeprop_args writeprop_args; ni_createname_args createname_args; ni_nameindex_args destroy_name_args; ni_writename_args writename_args; } write_any_args; /* * This union encodes all possible results from NetInfo write calls. */ typedef union write_any_res { ni_create_res create_res; ni_id_res destroy_res; ni_id_res write_res; ni_id_res createprop_res; ni_id_res destroyprop_res; ni_id_res renameprop_res; ni_id_res writeprop_res; ni_id_res createname_res; ni_id_res destroyname_res; ni_id_res writename_res; } write_any_res; /* * We store the XDR routines associated with each NetInfo write call * in a table with this data structure for each entry. */ typedef struct xdr_table_entry { unsigned proc; /* procedure number of write call */ unsigned insize; /* size of input arguments */ xdrproc_t xdr_in; /* xdr routine for input arguments */ unsigned outsize; /* size of output results */ xdrproc_t xdr_out; /* xdr routine for output results */ } xdr_table_entry; /* * Macro used to initialize the table */ #define PUSH(proc, arg_type, res_type) \ { proc, sizeof(arg_type), xdr_##arg_type, \ sizeof(res_type), xdr_##res_type } /* * The table itself */ static const xdr_table_entry xdr_table[] = { PUSH(_NI_CREATE, ni_create_args, ni_create_res), PUSH(_NI_DESTROY, ni_destroy_args, ni_id_res), PUSH(_NI_WRITE, ni_proplist_stuff, ni_id_res), PUSH(_NI_CREATEPROP, ni_createprop_args, ni_id_res), PUSH(_NI_DESTROYPROP, ni_prop_args, ni_id_res), PUSH(_NI_RENAMEPROP, ni_propname_args, ni_id_res), PUSH(_NI_WRITEPROP, ni_writeprop_args, ni_id_res), PUSH(_NI_CREATENAME, ni_createname_args, ni_id_res), PUSH(_NI_WRITEPROP, ni_writeprop_args, ni_id_res), PUSH(_NI_CREATENAME, ni_createname_args, ni_id_res), PUSH(_NI_DESTROYNAME, ni_nameindex_args, ni_id_res), PUSH(_NI_WRITENAME, ni_writename_args, ni_id_res), PUSH(_NI_RESYNC, void, ni_status), }; #undef PUSH #define XDR_TABLE_SIZE (sizeof(xdr_table)/sizeof(xdr_table[0])) static const xdr_table_entry *xdr_table_lookup(unsigned); /* * The list of notifications to be sent out */ typedef struct notify_node *notify_list; struct notify_node { unsigned proc; /* the procedure to execute on the clone */ unsigned checksum; /* checksum after applying this operation */ write_any_args args; /* the arguments to supply */ clone_list newlist; /* new list of clones, if proc is RESYNC */ notify_list next; /* next item on list */ }; /* * Arguments to the notify() thread */ typedef struct notify_args { clone_list list; /* list of clone servers */ unsigned checksum; /* checksum of master database */ } notify_args; static void notify(notify_args *); /* * The queue of notifications to be sent */ static volatile notify_list notifications; /* * For locking the queue */ static syslock *notify_syslock = NULL; /* * For tracking the status of update-distributing subthreads. */ static syslock *subthread_syslock; static volatile int notify_subthread_count = 0; static volatile long next_update_number = 0; typedef struct { clone_list clones; notify_list updates; long seqno; } fork_push_args; static void push(clone_list, notify_list updates, long seq); static void fork_push(clone_list, notify_list updates, long seq); static void push_thread_stub(fork_push_args *); unsigned count_procs(notify_list); bool_t udp_getregister(ni_name, ni_name, struct sockaddr_in, struct timeval, nibind_getregister_res *, char *); bool_t tcp_getregister(ni_name, ni_name, struct sockaddr_in, struct timeval, nibind_getregister_res *, char *); clone_list global_clone_list = NULL; /* Assumes there's only one! */ /* * Destroys a client handle with locks */ static void clnt_destroy_lock(CLIENT *cl, int sock) { socket_lock(); clnt_destroy(cl); close(sock); socket_unlock(); } /* * Does this entry serve the "." domain? * Return the name and tag of the server if it does serve "." */ static unsigned servesdot(ni_entry entry, ni_index ei, ni_name *name, ni_name *tag) { ni_name sep; ni_name slashname; unsigned addr; ni_namelist nl; ni_id id; if (entry.names == NULL) return (0); id.nii_object = entry.id; slashname = entry.names->ninl_val[ei]; sep = index(slashname, NI_SEPARATOR); if (sep == NULL) return 0; if (!ni_name_match_seg(NAME_DOT, slashname, sep - slashname)) return 0; NI_INIT(&nl); if (ni_lookupprop(db_ni, &id, NAME_IP_ADDRESS, &nl) != NI_OK) return 0; if (nl.ninl_len == 0) return 0; addr = inet_addr(nl.ninl_val[0]); ni_namelist_free(&nl); *tag = ni_name_dup(sep + 1); NI_INIT(&nl); if (ni_lookupprop(db_ni, &id, NAME_NAME, &nl) != NI_OK) { *name = NULL; } else { if (nl.ninl_len > 0) *name = ni_name_dup(nl.ninl_val[0]); ni_namelist_free(&nl); } return (addr); } /* * Retains a (global) copy of a clone list */ static clone_list clonelist_retain(clone_list clist) { if (clist == NULL) return clist; clist->retain++; return clist; } /* * Frees up a clone list */ static void clonelist_release(clone_list clist) { clone_list l; if (clist == NULL) return; clist->retain--; if (clist->retain > 0) return; while (clist != NULL) { l = clist; clist = clist->next; ni_name_free(&l->name); ni_name_free(&l->tag); syslock_free(l->lock); MM_FREE(l); } } /* * Creates a list of clone servers from the database */ static clone_list clonelist_create() { ni_id id; ni_index i; unsigned addr; struct in_addr x; clone_list new; clone_list clist; ni_name name = NULL; ni_name tag = NULL; ni_idlist idlist; ni_entrylist entries; clone_count = 0; /* Start off assuming none to simplify code */ if (ni_root(db_ni, &id) != NI_OK) return NULL; NI_INIT(&idlist); if (ni_lookup(db_ni, &id, NAME_NAME, NAME_MACHINES, &idlist) != NI_OK) return NULL; id.nii_object = idlist.niil_val[0]; ni_idlist_free(&idlist); if (ni_list_const(db_ni, &id, NAME_SERVES, &entries) != NI_OK) return NULL; clist = NULL; for (i = 0; i < entries.niel_len; i++) { ni_entry entry = entries.niel_val[i]; int i; if (entry.names == NULL) continue; for (i = 0; i < entry.names->ninl_len; i++) { addr = servesdot(entry, i, &name, &tag); if (addr == 0) continue; x.s_addr = addr; /* * Don't notify self */ if ((sys_is_my_address(&x)) && (!strcmp(tag, db_tag))) { ni_name_free(&name); ni_name_free(&tag); continue; } clone_count++; MM_ALLOC(new); new->name = name; new->addr = addr; new->tag = tag; new->lastupdate = 0; new->lock = syslock_new(0); new->up_to_date = TRUE; new->next = clist; new->retain = 1; clist = new; } } ni_list_const_free(db_ni); return (clist); } /* * Is there a notifier thread running? */ static int have_notifier; /* * Starts up a notification thread. Is smart enough not to start one * if the clone list is empty. */ int notify_start(void) { notify_args *args; clone_list clist; clone_list xlist; systhread *t; clist = clonelist_create(); xlist = global_clone_list; global_clone_list = clonelist_retain(clist); clonelist_release(xlist); /* If no clones, we don't need a notify thread */ if (clist == NULL) return 0; MM_ALLOC(args); args->list = clist; args->checksum = ni_getchecksum(db_ni); if (notify_syslock == NULL) notify_syslock = syslock_new(0); if (subthread_syslock == NULL) subthread_syslock = syslock_new(0); t = systhread_new(); systhread_set_name(t, "notify"); systhread_run(t, (void(*)(void *))notify, (void *)args); have_notifier++; return 1; } /* * Send out a resync notification. This procedure is special-cased * because the master updates its list of clone servers when it gets * a resync call. */ void notify_resync(void) { notify_args *args; clone_list clist; clone_list xlist; systhread *t; clist = clonelist_create(); xlist = global_clone_list; global_clone_list = clonelist_retain(clist); clonelist_release(xlist); get_readall_info(db_ni, &max_readall_proxies, &strict_proxies); max_subthreads = get_max_subthreads(db_ni); update_latency_secs = get_update_latency(db_ni); if (!have_notifier) { if (clist != NULL) { MM_ALLOC(args); args->list = clist; args->checksum = ni_getchecksum(db_ni); if (notify_syslock == NULL) notify_syslock = syslock_new(0); if (subthread_syslock == NULL) subthread_syslock = syslock_new(0); t = systhread_new(); systhread_set_name(t, "notify"); systhread_run(t, (void(*)(void *))notify, (void *)args); have_notifier++; } } else { notify_clients(_NI_RESYNC, clist); } } /* * The notification thread */ static void notify(notify_args *args) { clone_list l; notify_list newlist; notify_list lastelt; struct timeval tv; CLIENT *cl; int sock; struct sockaddr_in sin; nibind_getregister_res res; bool_t notify_waited = FALSE; enum clnt_stat cstat; int clones_skipped = 0; tv.tv_sec = NOTIFY_TIMEOUT; tv.tv_usec = 0; sin.sin_family = AF_INET; MM_ZERO(sin.sin_zero); /* * First, tell everybody that you've crashed */ system_log(LOG_INFO, "ni_crashed notification starting {%u}: %d clone%s", ni_getchecksum(db_ni), count_clones(), 1 == count_clones() ? "" : "s"); systhread_set_name(systhread_current(), "notify-crashed"); for (l = args->list; l != NULL; l = l->next) { sin.sin_port = 0; sin.sin_addr.s_addr = l->addr; if (! (udp_getregister(l->name, l->tag, sin, tv, &res, "ni_crashed") || tcp_getregister(l->name, l->tag, sin, tv, &res, "ni_crashed"))) { /* the *_getregister functions do their own logging */ clones_skipped++; continue; } if (res.status != NI_OK) { system_log(LOG_WARNING, "ni_crashed found %s/%s unregistered", inet_ntoa(sin.sin_addr), l->tag); clones_skipped++; continue; } sin.sin_port = htons(res.nibind_getregister_res_u.addrs.tcp_port); sock = socket_connect(&sin, NI_PROG, NI_VERS); if (sock < 0) { system_log(LOG_WARNING, "ni_crashed can't connect to " "%s/%s - %m", inet_ntoa(sin.sin_addr), l->tag); clones_skipped++; continue; } /* * Protect the main thread from using the set of ALL known * RPC file descriptors (svc_fdset) by excluding those FDs * associated with the client side operations. */ FD_SET(sock, &clnt_fdset); /* protect client socket */ cl = clnttcp_create(&sin, NI_PROG, NI_VERS, &sock, 0, 0); if (cl == NULL) { system_log(LOG_WARNING, "ni_crashed can't create " "%s/%s%s", inet_ntoa(sin.sin_addr), l->tag, clnt_spcreateerror("")); clones_skipped++; socket_close(sock); FD_CLR(sock, &clnt_fdset); /* unprotect client socket */ continue; } cstat = clnt_call(cl, _NI_CRASHED, xdr_u_int, &args->checksum, xdr_void, NULL, tv); if (RPC_SUCCESS != cstat) { system_log(LOG_WARNING, "ni_crashed can't send " "to %s/%s - %s", inet_ntoa(sin.sin_addr), l->tag, clnt_sperrno(cstat)); clones_skipped++; } else { system_log(LOG_DEBUG, "ni_crashed to %s[%s]/%s:%hu", l->name, inet_ntoa(sin.sin_addr), l->tag, ntohs(sin.sin_port)); } clnt_destroy_lock(cl, sock); FD_CLR(sock, &clnt_fdset); /* unprotect client socket */ } if (0 == clones_skipped) { system_log(LOG_INFO, "ni_crashed sent"); } else { system_log(LOG_INFO, "ni_crashed sent; %d clone%s skipped", clones_skipped, (1 == clones_skipped ? "" : "s")); } clones_skipped = 0; systhread_set_name(systhread_current(), "notify"); /* * Now, should wait for things to be added to the * update queue and send them off to its clients. */ tv.tv_usec = 0; syslock_lock(notify_syslock); for (;;) { syslock_signal_wait(notify_syslock); /* ** GRS 2/12/92 - Force a ten-second latency after each ping ** to help combining adjacent operations into grouped updates. */ syslock_unlock(notify_syslock); systhread_sleep(update_latency_secs); syslock_lock(notify_syslock); while (notifications != NULL) { if (notifications->proc == _NI_RESYNC) { /* ** GRS 2/12/92 - Since this operation affects ** the clone list, we need to wait for all ** current threads to die off before proceeding. */ syslock_unlock(notify_syslock); syslock_lock(subthread_syslock); systhread_set_name(systhread_current(), "notify-resync"); if (0 != notify_subthread_count) { system_log(LOG_DEBUG, "resync waiting on %d thread%s", notify_subthread_count, 1 == notify_subthread_count ? "" : "s"); notify_waited = TRUE; } else { notify_waited = FALSE; } while (notify_subthread_count) { /* Wait for some threads to die */ syslock_signal_wait(subthread_syslock); } syslock_unlock(subthread_syslock); syslock_lock(notify_syslock); /* Perform the resync operation */ if (notify_waited) { system_log(LOG_DEBUG, "resync continuing"); } clonelist_release(args->list); args->list = notifications->newlist; lastelt = notifications; notifications = notifications->next; lastelt->next = NULL; syslock_unlock(notify_syslock); push(args->list, lastelt, next_update_number = 0); syslock_lock(notify_syslock); } else { /* Non-resync update */ syslock_unlock(notify_syslock); syslock_lock(subthread_syslock); if (notify_subthread_count >= max_subthreads) { system_log(LOG_DEBUG, "update %d awaiting a thread", next_update_number); } while (notify_subthread_count >= max_subthreads) { /* Wait for some threads to die */ syslock_signal_wait(subthread_syslock); } syslock_unlock(subthread_syslock); syslock_lock(notify_syslock); /* Lop off a bunch of notifications */ newlist = notifications; lastelt = NULL; while (newlist && (newlist->proc != _NI_RESYNC)) { lastelt = newlist; newlist = newlist->next; } if (lastelt) { lastelt->next = NULL; fork_push(args->list, notifications, next_update_number); } notifications = newlist; } next_update_number++; } } syslock_unlock(notify_syslock); } int have_notifications_pending(void) { int num_threads; if (notifications != NULL) { /* * List is not empty */ return (1); } if (!subthread_syslock) { return 0; } syslock_lock(subthread_syslock); num_threads = notify_subthread_count; syslock_unlock(subthread_syslock); return (num_threads != 0); } /* * Notify the clone servers of a change to the database. * proc = procedure to execute on clone * args = arguments to procedure * XXX: procedure is a misnomer - should be notify_clones */ void notify_clients( unsigned proc, void *args ) { notify_list *l; const xdr_table_entry *ent; if (!have_notifier) { if (!notify_start()) { return; } } ent = xdr_table_lookup(proc); if (ent == NULL) { return; } syslock_lock(notify_syslock); for (l = (notify_list *)¬ifications; *l != NULL; l = &(*l)->next); MM_ALLOC(*l); (*l)->proc = proc; if (proc == _NI_RESYNC) { (*l)->newlist = (clone_list)args; } else { bcopy(args, &(*l)->args, ent->insize); bzero(args, ent->insize); } (*l)->checksum = ni_getchecksum(db_ni); (*l)->next = NULL; syslock_signal_send(notify_syslock); syslock_unlock(notify_syslock); } /* * Tries to execute the procedure on each of the clone servers */ static void push(clone_list list, notify_list updates, long seq) { write_any_res res; const xdr_table_entry *ent = NULL; int sock; struct sockaddr_in sin; struct timeval tv; CLIENT *cl; clone_list l; nibind_getregister_res gres; notify_list this; enum clnt_stat clstat; int low_level_success; /* Did we get the low-level stuff done? */ char *msg_stage; /* Stage of operation for error message */ char *msg; /* And a message for the ages */ unsigned clones_skipped = 0; /* How many clones we skipped */ unsigned clones_current = 0; /* Clones we would have skipped */ int nprocs; int i; /* Temporary loop counter */ nprocs = count_procs(updates); system_log(LOG_INFO, "update %d starting: %d clone%s, %d operation%s", seq, count_clones(), 1 == count_clones() ? "" : "s", nprocs, 1 == nprocs ? "" : "s"); tv.tv_sec = NOTIFY_TIMEOUT; tv.tv_usec = 0; sin.sin_family = AF_INET; MM_ZERO(sin.sin_zero); for (l = list; l != NULL; l = l->next) { /* * Wait for any other threads that are pushing * earlier updates to catch up to this clone */ syslock_lock(l->lock); while ((l->lastupdate < (seq - 1)) && (l->lastupdate != NO_SEQUENCE_NUMBER)) { syslock_signal_wait(l->lock); } sin.sin_port = 0; sin.sin_addr.s_addr = l->addr; /* * If this clone has failed on the push of an earlier * sequence number, drop it from this update. It will * have to wait until a resync to set l->lastupdate = 0. */ if (l->lastupdate == NO_SEQUENCE_NUMBER) { /* * If a clone misses an update and then does a * readall, it'll be marked up_to_date, and should * then resume getting notifications. */ if (!l->up_to_date) { /* Skip this clone, indeed */ syslock_unlock(l->lock); syslock_signal_broadcast(l->lock); system_log(LOG_DEBUG, "update %d: skipping %s[%s]/%s:%hu", seq, l->name, inet_ntoa(sin.sin_addr), l->tag, ntohs(sin.sin_port)); clones_skipped++; continue; } else { /* Need to include this clone */ clones_current++; } } /* * Connect to the clone */ low_level_success = 0; /* Assume this low-level stuff bombs */ msg_stage = "binder contacting"; if (! (udp_getregister(l->name, l->tag, sin, tv, &gres, "notify") || tcp_getregister(l->name, l->tag, sin, tv, &gres, "notify"))) { msg = "couldn't getregister"; goto cleanup_locks; } if (gres.status != NI_OK) { /* XXX We use NORESPONSE for more than just parent */ msg = NI_NORESPONSE == gres.status ? "No response from clone" : (char *)ni_error(gres.status); goto cleanup_locks; } sin.sin_port = htons(gres.nibind_getregister_res_u.addrs.tcp_port); msg_stage = "contacting"; msg = "Couldn't get system resources (" "socket or TCP connection)"; sock = socket_connect(&sin, NI_PROG, NI_VERS); if (sock < 0) { goto cleanup_locks; } FD_SET(sock, &clnt_fdset); /* protect client socket */ cl = clnttcp_create(&sin, NI_PROG, NI_VERS, &sock, 0, 0); if (cl == NULL) { socket_close(sock); FD_CLR(sock, &clnt_fdset); /* unprotect client socket */ msg = clnt_spcreateerror("netinfo daemon"); goto cleanup_locks; } /* * Push coalesed changes to the clone */ low_level_success = 1; for (this = updates, i = 0; this; this = this->next, i++) { ent = xdr_table_lookup(this->proc); if (ent == NULL) { abort(); /* Should never happen */ } bzero(&res, ent->outsize); if (takes_arg(this->proc)) { system_log(LOG_DEBUG, 0 == ((nprocs - i) % 50) ? "update %d: %s %u to %s[%s]/%s:%hu {%u}, " "%d procs to go" : "update %d: %s %u to %s[%s]/%s:%hu {%u}", seq, procname(this->proc), /* Next line just grabs first arg */ this->args.create_args.id.nii_object, l->name, inet_ntoa(sin.sin_addr), l->tag, ntohs(sin.sin_port), this->checksum, nprocs - i); } else { system_log(LOG_DEBUG, 0 == ((nprocs - i) % 50) ? "update %d: %s to %s[%s]/%s:%hu {%u}, " "%d procs to go" : "update %d: %s to %s[%s]/%s:%hu {%u}", seq, procname(this->proc), l->name, inet_ntoa(sin.sin_addr), l->tag, ntohs(sin.sin_port), this->checksum, nprocs - i); } clstat = clnt_call(cl, this->proc, ent->xdr_in, &this->args, ent->xdr_out, &res, tv); if (clstat != RPC_SUCCESS) { struct sockaddr_in us; int count; count = sizeof(us); if (0 != getsockname(sock, (struct sockaddr *)&us, &count)) { us.sin_port = htons(-1); } system_log(LOG_ERR, "update %d: %s from port %hu to %s[%s]/%s:%hu " "RPC error - %s", seq, procname(this->proc), ntohs(us.sin_port), l->name, inet_ntoa(sin.sin_addr), l->tag, ntohs(sin.sin_port), clnt_sperrno(clstat)); clnt_destroy_lock(cl, sock); FD_CLR(sock, &clnt_fdset); /* unprotect client socket */ goto cleanup_locks; /* Shouldn't do others */ } /* * Next line uses the write_res part of the union * just 'cuz. No, it's probably not a write, but * it doesn't really matter: all the constituent types * of the union have status as their first member. */ if (res.write_res.status != NI_OK) { struct sockaddr_in us; int count; count = sizeof(us); if (0 != getsockname(sock, (struct sockaddr *)&us, &count)) { us.sin_port = htons(-1); } system_log(LOG_ERR, "update %d: %s from port %hu to %s[%s]/%s:%hu " "failed - %s", seq, procname(this->proc), ntohs(us.sin_port), l->name, inet_ntoa(sin.sin_addr), l->tag, ntohs(sin.sin_port), ni_error(res.write_res.status)); clnt_destroy_lock(cl, sock); FD_CLR(sock, &clnt_fdset); /* unprotect client socket */ goto cleanup_locks; /* Shouldn't do others */ } } /* * Push was successful - reset this clone's sequence number */ clnt_destroy_lock(cl, sock); FD_CLR(sock, &clnt_fdset); /* unprotect client socket */ l->lastupdate = seq; l->up_to_date = TRUE; syslock_unlock(l->lock); syslock_signal_broadcast(l->lock); continue; /* * Push failed - give up on this clone */ cleanup_locks: l->lastupdate = NO_SEQUENCE_NUMBER; l->up_to_date = FALSE; syslock_unlock(l->lock); syslock_signal_broadcast(l->lock); if (!low_level_success) { system_log(LOG_ERR, "update %d to %s[%s]/%s failed during %s - " "%s", seq, l->name, inet_ntoa(sin.sin_addr), l->tag, msg_stage, msg); } } /* * Clean up */ while (updates) { ent = xdr_table_lookup(updates->proc); xdr_free(ent->xdr_in, (void *)&updates->args); this = updates->next; MM_FREE(updates); updates = this; } if (0 == clones_skipped && 0 == clones_current) { system_log(LOG_INFO, "update %d sent", seq); } else if (0 == clones_skipped && 0 != clones_current) { system_log(LOG_INFO, "update %d sent; %d clone%s included per TS enhancement", seq, clones_current, (1 == clones_current ? "" : "s")); } else if (0 != clones_skipped && 0 == clones_current) { system_log(LOG_INFO, "update %d sent; %d clone%s skipped", seq, clones_skipped, (1 == clones_skipped ? "" : "s")); } else { /* 0 != clones_current && 0 != clones_skipped */ system_log(LOG_INFO, "update %d sent; %d clone%s skipped, " "%d clone%s included per TS enhancement", seq, clones_skipped, (1 == clones_skipped ? "" : "s"), clones_current, (1 == clones_current ? "" : "s")); } } /* * Do a push in a separate thread */ static void fork_push(clone_list clones, notify_list updates, long seq) { fork_push_args *args; char buf[64]; systhread *t; if (!(args = (fork_push_args *)malloc(sizeof(fork_push_args)))) return; args->updates = updates; args->clones = clones; args->seqno = seq; syslock_lock(subthread_syslock); t = systhread_new(); sprintf(buf, "notify-%ld", seq); systhread_set_name(t, buf); systhread_run(t, (void(*)(void *))push_thread_stub, (void *)args); notify_subthread_count++; syslock_unlock(subthread_syslock); return; } static void push_thread_stub(fork_push_args *args) { push(args->clones, args->updates, args->seqno); MM_FREE(args); syslock_lock(subthread_syslock); notify_subthread_count--; syslock_signal_send(subthread_syslock); syslock_unlock(subthread_syslock); systhread_exit(); } /* * Looks up the XDR information for the given procedure */ static const xdr_table_entry * xdr_table_lookup(unsigned proc) { int i; for (i = 0; i < XDR_TABLE_SIZE; i++) { if (xdr_table[i].proc == proc) return (&xdr_table[i]); } return NULL; } const char * procname(int procnum) { static char *procname_array[] = { "ping", "statistics", "root", "self", "parent", "create", "destroy", "read", "write", "children", "lookup", "list", "createprop", "destroyprop", "readprop", "writeprop", "renameprop", "listprops", "createname", "destroyname", "readname", "writename", "rparent", "listall", "bind", "readall", "crashed", "resync", "lookupread"}; return(procnum < (sizeof(procname_array) / sizeof(*procname_array)) ? procname_array[procnum] : "*UNKNOWN*"); } int notifications_pending(void) { int i; notify_list n; /* If lock is unitialized, nothing's pending */ if (notify_syslock == NULL) return 0; syslock_lock(notify_syslock); for (i = 0, n = notifications; NULL != n; n = n->next) i++; syslock_unlock(notify_syslock); return i; } /* Encapsulate the implementations, rather than making the variables visible */ int count_notify_subthreads(void) { return(notify_subthread_count); } int count_clones(void) { return(clone_count); } const bool_t takes_arg(int procnum) { switch (procnum) { case _NI_RESYNC: case _NI_RPARENT: case _NI_ROOT: case _NI_STATISTICS: case _NI_PING: return FALSE; } return TRUE; } unsigned count_procs(notify_list updates) { int i; notify_list n; for (i = 0, n = updates; n != NULL; n = n->next) i++; return i; } bool_t udp_getregister(ni_name name, ni_name tag, struct sockaddr_in sin, struct timeval tv, nibind_getregister_res *res, char *caller) { int sock; CLIENT *cl; enum clnt_stat clstat; char *msg_stage; char *msg; sin.sin_port = 0; msg_stage = "opening (UDP)"; sock = socket_open(&sin, NIBIND_PROG, NIBIND_VERS); if (sock < 0) { msg = "socket_open failed"; goto failed; } msg_stage = "creating (UDP)"; FD_SET(sock, &clnt_fdset); /* protect client socket */ cl = clntudp_create(&sin, NIBIND_PROG, NIBIND_VERS, tv, &sock); if (cl == NULL) { msg = clnt_spcreateerror("binder daemon"); socket_close(sock); FD_CLR(sock, &clnt_fdset); /* unprotect client socket */ goto failed; } msg_stage = "connecting (UDP)"; if ((clstat = clnt_call(cl, NIBIND_GETREGISTER, xdr_ni_name, &tag, xdr_nibind_getregister_res, res, tv)) != RPC_SUCCESS) { msg = clnt_sperrno(clstat); clnt_destroy_lock(cl, sock); FD_CLR(sock, &clnt_fdset); /* unprotect client socket */ goto failed; } clnt_destroy_lock(cl, sock); FD_CLR(sock, &clnt_fdset); /* unprotect client socket */ return(TRUE); failed: system_log(LOG_WARNING, "%s's udp binder connection to %s[%s]/%s " "failed during %s - %s", caller, name, inet_ntoa(sin.sin_addr), tag, msg_stage, msg); return(FALSE); } bool_t tcp_getregister(ni_name name, ni_name tag, struct sockaddr_in sin, struct timeval tv, nibind_getregister_res *res, char *caller) { int sock; CLIENT *cl; enum clnt_stat clstat; char *msg_stage; char *msg; sin.sin_port = 0; msg_stage = "opening (TCP)"; sock = socket_connect(&sin, NIBIND_PROG, NIBIND_VERS); if (sock < 0) { msg = "socket_connect failed"; goto failed; } msg_stage = "creating (TCP)"; FD_SET(sock, &clnt_fdset); /* protect client socket */ cl = clnttcp_create(&sin, NIBIND_PROG, NIBIND_VERS, &sock, 0, 0); if (cl == NULL) { msg = clnt_spcreateerror("binder daemon"); socket_close(sock); FD_CLR(sock, &clnt_fdset); /* unprotect client socket */ goto failed; } msg_stage = "connecting (TCP)"; if ((clstat = clnt_call(cl, NIBIND_GETREGISTER, xdr_ni_name, &tag, xdr_nibind_getregister_res, res, tv)) != RPC_SUCCESS) { msg = clnt_sperrno(clstat); clnt_destroy_lock(cl, sock); FD_CLR(sock, &clnt_fdset); /* unprotect client socket */ goto failed; } clnt_destroy_lock(cl, sock); FD_CLR(sock, &clnt_fdset); /* unprotect client socket */ return(TRUE); failed: system_log(LOG_WARNING, "%s's tcp binder connection to %s[%s]/%s " "failed during %s - %s", caller, name, inet_ntoa(sin.sin_addr), tag, msg_stage, msg); return(FALSE); } /* * When a clone completes a readall successfully, we should note that * it's current (i.e., up_to_date), so that additional updates that * arrive before the next resync cleanup time are propagated. */ void notify_mark_clone(const unsigned long addr) { clone_list clist; clone_list clone; clone_list found_clone = NULL; struct in_addr t_addr; /* * Find the clone that did the readall. Mark it as current. * XXX If there are multiple interfaces for this clone's host, * and the readall came from an interface not listed as running * a clone (in the master's /machines), then it's not on our * clone list, and it won't be marked as up_to_date. If there * are multiple clones running on one host, we haven't enough * information to distinguish among them: all we have is the * address of the requesting clone, not its tag. So, we punt * on that one, too. XXX */ clist = clonelist_retain(global_clone_list); for (clone = clist; clone != NULL; clone = clone->next) { /* Search for the clone. */ if (addr == clone->addr) { if (NULL == found_clone) { found_clone = clone; } else { /* Already have one! */ t_addr.s_addr = addr; system_log(LOG_DEBUG, "Multiple clones at %s[%s]; can't mark up_to_date", clone->name, inet_ntoa(t_addr)); clonelist_release(clist); return; } } } if (NULL == found_clone) { t_addr.s_addr = addr; system_log(LOG_INFO, "readall completed from unregistered clone at %s", inet_ntoa(t_addr)); clonelist_release(clist); return; } /* Found exactly one clone for this address. Mark it up_to_date. */ t_addr.s_addr = addr; syslock_lock(found_clone->lock); found_clone->up_to_date = TRUE; system_log(LOG_INFO, "clone %s[%s]/%s marked current; was%s being skipped", found_clone->name, inet_ntoa(t_addr), found_clone->tag, NO_SEQUENCE_NUMBER == found_clone->lastupdate ? "" : "n't"); syslock_unlock(found_clone->lock); clonelist_release(clist); return; }