/*
* Copyright (c) 1999 Apple Computer, Inc. All rights reserved.
*
* @APPLE_LICENSE_HEADER_START@
*
* "Portions Copyright (c) 1999 Apple Computer, Inc. All Rights
* Reserved. This file contains Original Code and/or Modifications of
* Original Code as defined in and that are subject to the Apple Public
* Source License Version 1.0 (the 'License'). You may not use this file
* except in compliance with the License. Please obtain a copy of the
* License at http://www.apple.com/publicsource and read it before using
* this file.
*
* The Original Code and all software distributed under the License are
* distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
* EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
* INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE OR NON-INFRINGEMENT. Please see the
* License for the specific language governing rights and limitations
* under the License."
*
* @APPLE_LICENSE_HEADER_END@
*/
/*
* Notification thread routines.
* Copyright (C) 1989 by NeXT, Inc.
*
* The notification thread runs only on the master. It notifies
* clone servers about changes to the database or resynchronization
* requests from the master.
*/
#include <NetInfo/config.h>
#include "ni_server.h"
#include <sys/socket.h>
#include <sys/time.h>
#include <NetInfo/systhread.h>
#include <stdio.h>
#include <NetInfo/system_log.h>
#include <NetInfo/network.h>
#include "ni_globals.h"
#include <NetInfo/mm.h>
#include "getstuff.h"
#include "ni_notify.h"
#include <NetInfo/socket_lock.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <string.h>
#include <rpc/xdr.h>
const char *procname(int);
const bool_t takes_arg(int);
unsigned int clone_count = 0;
#define NI_SEPARATOR '/' /* Separator used in NetInfo property values */
#define NOTIFY_TIMEOUT 60 /* Time to wait before giving up on clone */
/*
* The notification thread knows about the clone servers through
* this data structure.
*/
typedef struct client_node *clone_list;
struct client_node
{
ni_name name; /* host name of clone */
unsigned long addr; /* IP address of clone - in network byte order*/
ni_name tag; /* Which database served by clone (by tag) */
syslock *lock; /* Try only one update per clone */
long lastupdate; /* Serialize updates */
bool_t up_to_date; /* Clone's up-to-date due to readall */
clone_list next; /* next clone */
int retain; /* ref count (only valid in first node) */
};
#define NO_SEQUENCE_NUMBER -1
/*
* This union encodes all possible arguments involved in NetInfo write
* calls.
*/
typedef union write_any_args
{
ni_create_args create_args;
ni_destroy_args destroy_args;
ni_proplist_stuff write_args;
ni_createprop_args createprop_args;
ni_prop_args destroyprop_args;
ni_propname_args renameprop_args;
ni_writeprop_args writeprop_args;
ni_createname_args createname_args;
ni_nameindex_args destroy_name_args;
ni_writename_args writename_args;
} write_any_args;
/*
* This union encodes all possible results from NetInfo write calls.
*/
typedef union write_any_res
{
ni_create_res create_res;
ni_id_res destroy_res;
ni_id_res write_res;
ni_id_res createprop_res;
ni_id_res destroyprop_res;
ni_id_res renameprop_res;
ni_id_res writeprop_res;
ni_id_res createname_res;
ni_id_res destroyname_res;
ni_id_res writename_res;
} write_any_res;
/*
* We store the XDR routines associated with each NetInfo write call
* in a table with this data structure for each entry.
*/
typedef struct xdr_table_entry
{
unsigned proc; /* procedure number of write call */
unsigned insize; /* size of input arguments */
xdrproc_t xdr_in; /* xdr routine for input arguments */
unsigned outsize; /* size of output results */
xdrproc_t xdr_out; /* xdr routine for output results */
} xdr_table_entry;
/*
* Macro used to initialize the table
*/
#define PUSH(proc, arg_type, res_type) \
{ proc, sizeof(arg_type), xdr_##arg_type, \
sizeof(res_type), xdr_##res_type }
/*
* The table itself
*/
static const xdr_table_entry xdr_table[] =
{
PUSH(_NI_CREATE, ni_create_args, ni_create_res),
PUSH(_NI_DESTROY, ni_destroy_args, ni_id_res),
PUSH(_NI_WRITE, ni_proplist_stuff, ni_id_res),
PUSH(_NI_CREATEPROP, ni_createprop_args, ni_id_res),
PUSH(_NI_DESTROYPROP, ni_prop_args, ni_id_res),
PUSH(_NI_RENAMEPROP, ni_propname_args, ni_id_res),
PUSH(_NI_WRITEPROP, ni_writeprop_args, ni_id_res),
PUSH(_NI_CREATENAME, ni_createname_args, ni_id_res),
PUSH(_NI_WRITEPROP, ni_writeprop_args, ni_id_res),
PUSH(_NI_CREATENAME, ni_createname_args, ni_id_res),
PUSH(_NI_DESTROYNAME, ni_nameindex_args, ni_id_res),
PUSH(_NI_WRITENAME, ni_writename_args, ni_id_res),
PUSH(_NI_RESYNC, void, ni_status),
};
#undef PUSH
#define XDR_TABLE_SIZE (sizeof(xdr_table)/sizeof(xdr_table[0]))
static const xdr_table_entry *xdr_table_lookup(unsigned);
/*
* The list of notifications to be sent out
*/
typedef struct notify_node *notify_list;
struct notify_node
{
unsigned proc; /* the procedure to execute on the clone */
unsigned checksum; /* checksum after applying this operation */
write_any_args args; /* the arguments to supply */
clone_list newlist; /* new list of clones, if proc is RESYNC */
notify_list next; /* next item on list */
};
/*
* Arguments to the notify() thread
*/
typedef struct notify_args
{
clone_list list; /* list of clone servers */
unsigned checksum; /* checksum of master database */
} notify_args;
static void notify(notify_args *);
/*
* The queue of notifications to be sent
*/
static volatile notify_list notifications;
/*
* For locking the queue
*/
static syslock *notify_syslock = NULL;
/*
* For tracking the status of update-distributing subthreads.
*/
static syslock *subthread_syslock;
static volatile int notify_subthread_count = 0;
static volatile long next_update_number = 0;
typedef struct
{
clone_list clones;
notify_list updates;
long seqno;
} fork_push_args;
static void push(clone_list, notify_list updates, long seq);
static void fork_push(clone_list, notify_list updates, long seq);
static void push_thread_stub(fork_push_args *);
unsigned count_procs(notify_list);
bool_t udp_getregister(ni_name, ni_name, struct sockaddr_in,
struct timeval, nibind_getregister_res *, char *);
bool_t tcp_getregister(ni_name, ni_name, struct sockaddr_in,
struct timeval, nibind_getregister_res *, char *);
clone_list global_clone_list = NULL; /* Assumes there's only one! */
/*
* Destroys a client handle with locks
*/
static void
clnt_destroy_lock(CLIENT *cl, int sock)
{
socket_lock();
clnt_destroy(cl);
close(sock);
socket_unlock();
}
/*
* Does this entry serve the "." domain?
* Return the name and tag of the server if it does serve "."
*/
static unsigned
servesdot(ni_entry entry, ni_index ei, ni_name *name, ni_name *tag)
{
ni_name sep;
ni_name slashname;
unsigned addr;
ni_namelist nl;
ni_id id;
if (entry.names == NULL) return (0);
id.nii_object = entry.id;
slashname = entry.names->ninl_val[ei];
sep = index(slashname, NI_SEPARATOR);
if (sep == NULL) return 0;
if (!ni_name_match_seg(NAME_DOT, slashname, sep - slashname)) return 0;
NI_INIT(&nl);
if (ni_lookupprop(db_ni, &id, NAME_IP_ADDRESS, &nl) != NI_OK) return 0;
if (nl.ninl_len == 0) return 0;
addr = inet_addr(nl.ninl_val[0]);
ni_namelist_free(&nl);
*tag = ni_name_dup(sep + 1);
NI_INIT(&nl);
if (ni_lookupprop(db_ni, &id, NAME_NAME, &nl) != NI_OK)
{
*name = NULL;
}
else
{
if (nl.ninl_len > 0) *name = ni_name_dup(nl.ninl_val[0]);
ni_namelist_free(&nl);
}
return (addr);
}
/*
* Retains a (global) copy of a clone list
*/
static clone_list
clonelist_retain(clone_list clist)
{
if (clist == NULL) return clist;
clist->retain++;
return clist;
}
/*
* Frees up a clone list
*/
static void
clonelist_release(clone_list clist)
{
clone_list l;
if (clist == NULL) return;
clist->retain--;
if (clist->retain > 0) return;
while (clist != NULL)
{
l = clist;
clist = clist->next;
ni_name_free(&l->name);
ni_name_free(&l->tag);
syslock_free(l->lock);
MM_FREE(l);
}
}
/*
* Creates a list of clone servers from the database
*/
static clone_list
clonelist_create()
{
ni_id id;
ni_index i;
unsigned addr;
struct in_addr x;
clone_list new;
clone_list clist;
ni_name name = NULL;
ni_name tag = NULL;
ni_idlist idlist;
ni_entrylist entries;
clone_count = 0; /* Start off assuming none to simplify code */
if (ni_root(db_ni, &id) != NI_OK) return NULL;
NI_INIT(&idlist);
if (ni_lookup(db_ni, &id, NAME_NAME, NAME_MACHINES, &idlist) != NI_OK)
return NULL;
id.nii_object = idlist.niil_val[0];
ni_idlist_free(&idlist);
if (ni_list_const(db_ni, &id, NAME_SERVES, &entries) != NI_OK) return NULL;
clist = NULL;
for (i = 0; i < entries.niel_len; i++)
{
ni_entry entry = entries.niel_val[i];
int i;
if (entry.names == NULL) continue;
for (i = 0; i < entry.names->ninl_len; i++)
{
addr = servesdot(entry, i, &name, &tag);
if (addr == 0) continue;
x.s_addr = addr;
/*
* Don't notify self
*/
if ((sys_is_my_address(&x)) && (!strcmp(tag, db_tag)))
{
ni_name_free(&name);
ni_name_free(&tag);
continue;
}
clone_count++;
MM_ALLOC(new);
new->name = name;
new->addr = addr;
new->tag = tag;
new->lastupdate = 0;
new->lock = syslock_new(0);
new->up_to_date = TRUE;
new->next = clist;
new->retain = 1;
clist = new;
}
}
ni_list_const_free(db_ni);
return (clist);
}
/*
* Is there a notifier thread running?
*/
static int have_notifier;
/*
* Starts up a notification thread. Is smart enough not to start one
* if the clone list is empty.
*/
int
notify_start(void)
{
notify_args *args;
clone_list clist;
clone_list xlist;
systhread *t;
clist = clonelist_create();
xlist = global_clone_list;
global_clone_list = clonelist_retain(clist);
clonelist_release(xlist);
/* If no clones, we don't need a notify thread */
if (clist == NULL) return 0;
MM_ALLOC(args);
args->list = clist;
args->checksum = ni_getchecksum(db_ni);
if (notify_syslock == NULL) notify_syslock = syslock_new(0);
if (subthread_syslock == NULL) subthread_syslock = syslock_new(0);
t = systhread_new();
systhread_set_name(t, "notify");
systhread_run(t, (void(*)(void *))notify, (void *)args);
have_notifier++;
return 1;
}
/*
* Send out a resync notification. This procedure is special-cased
* because the master updates its list of clone servers when it gets
* a resync call.
*/
void
notify_resync(void)
{
notify_args *args;
clone_list clist;
clone_list xlist;
systhread *t;
clist = clonelist_create();
xlist = global_clone_list;
global_clone_list = clonelist_retain(clist);
clonelist_release(xlist);
get_readall_info(db_ni, &max_readall_proxies, &strict_proxies);
max_subthreads = get_max_subthreads(db_ni);
update_latency_secs = get_update_latency(db_ni);
if (!have_notifier)
{
if (clist != NULL)
{
MM_ALLOC(args);
args->list = clist;
args->checksum = ni_getchecksum(db_ni);
if (notify_syslock == NULL) notify_syslock = syslock_new(0);
if (subthread_syslock == NULL) subthread_syslock = syslock_new(0);
t = systhread_new();
systhread_set_name(t, "notify");
systhread_run(t, (void(*)(void *))notify, (void *)args);
have_notifier++;
}
}
else
{
notify_clients(_NI_RESYNC, clist);
}
}
/*
* The notification thread
*/
static void
notify(notify_args *args)
{
clone_list l;
notify_list newlist;
notify_list lastelt;
struct timeval tv;
CLIENT *cl;
int sock;
struct sockaddr_in sin;
nibind_getregister_res res;
bool_t notify_waited = FALSE;
enum clnt_stat cstat;
int clones_skipped = 0;
tv.tv_sec = NOTIFY_TIMEOUT;
tv.tv_usec = 0;
sin.sin_family = AF_INET;
MM_ZERO(sin.sin_zero);
/*
* First, tell everybody that you've crashed
*/
system_log(LOG_INFO,
"ni_crashed notification starting {%u}: %d clone%s",
ni_getchecksum(db_ni), count_clones(), 1 == count_clones() ? "" : "s");
systhread_set_name(systhread_current(), "notify-crashed");
for (l = args->list; l != NULL; l = l->next)
{
sin.sin_port = 0;
sin.sin_addr.s_addr = l->addr;
if (! (udp_getregister(l->name, l->tag, sin, tv, &res, "ni_crashed") ||
tcp_getregister(l->name, l->tag, sin, tv, &res, "ni_crashed")))
{
/* the *_getregister functions do their own logging */
clones_skipped++;
continue;
}
if (res.status != NI_OK)
{
system_log(LOG_WARNING,
"ni_crashed found %s/%s unregistered",
inet_ntoa(sin.sin_addr), l->tag);
clones_skipped++;
continue;
}
sin.sin_port = htons(res.nibind_getregister_res_u.addrs.tcp_port);
sock = socket_connect(&sin, NI_PROG, NI_VERS);
if (sock < 0)
{
system_log(LOG_WARNING, "ni_crashed can't connect to "
"%s/%s - %m",
inet_ntoa(sin.sin_addr), l->tag);
clones_skipped++;
continue;
}
/*
* Protect the main thread from using the set of ALL known
* RPC file descriptors (svc_fdset) by excluding those FDs
* associated with the client side operations.
*/
FD_SET(sock, &clnt_fdset); /* protect client socket */
cl = clnttcp_create(&sin, NI_PROG, NI_VERS, &sock, 0, 0);
if (cl == NULL)
{
system_log(LOG_WARNING, "ni_crashed can't create "
"%s/%s%s",
inet_ntoa(sin.sin_addr), l->tag,
clnt_spcreateerror(""));
clones_skipped++;
socket_close(sock);
FD_CLR(sock, &clnt_fdset); /* unprotect client socket */
continue;
}
cstat = clnt_call(cl, _NI_CRASHED, xdr_u_int,
&args->checksum, xdr_void, NULL, tv);
if (RPC_SUCCESS != cstat)
{
system_log(LOG_WARNING, "ni_crashed can't send "
"to %s/%s - %s",
inet_ntoa(sin.sin_addr), l->tag,
clnt_sperrno(cstat));
clones_skipped++;
}
else
{
system_log(LOG_DEBUG,
"ni_crashed to %s[%s]/%s:%hu",
l->name, inet_ntoa(sin.sin_addr), l->tag,
ntohs(sin.sin_port));
}
clnt_destroy_lock(cl, sock);
FD_CLR(sock, &clnt_fdset); /* unprotect client socket */
}
if (0 == clones_skipped)
{
system_log(LOG_INFO, "ni_crashed sent");
}
else
{
system_log(LOG_INFO, "ni_crashed sent; %d clone%s skipped",
clones_skipped, (1 == clones_skipped ? "" : "s"));
}
clones_skipped = 0;
systhread_set_name(systhread_current(), "notify");
/*
* Now, should wait for things to be added to the
* update queue and send them off to its clients.
*/
tv.tv_usec = 0;
syslock_lock(notify_syslock);
for (;;)
{
syslock_signal_wait(notify_syslock);
/*
** GRS 2/12/92 - Force a ten-second latency after each ping
** to help combining adjacent operations into grouped updates.
*/
syslock_unlock(notify_syslock);
systhread_sleep(update_latency_secs);
syslock_lock(notify_syslock);
while (notifications != NULL)
{
if (notifications->proc == _NI_RESYNC)
{
/*
** GRS 2/12/92 - Since this operation affects
** the clone list, we need to wait for all
** current threads to die off before proceeding.
*/
syslock_unlock(notify_syslock);
syslock_lock(subthread_syslock);
systhread_set_name(systhread_current(), "notify-resync");
if (0 != notify_subthread_count)
{
system_log(LOG_DEBUG,
"resync waiting on %d thread%s",
notify_subthread_count,
1 == notify_subthread_count ?
"" : "s");
notify_waited = TRUE;
}
else
{
notify_waited = FALSE;
}
while (notify_subthread_count)
{
/* Wait for some threads to die */
syslock_signal_wait(subthread_syslock);
}
syslock_unlock(subthread_syslock);
syslock_lock(notify_syslock);
/* Perform the resync operation */
if (notify_waited)
{
system_log(LOG_DEBUG, "resync continuing");
}
clonelist_release(args->list);
args->list = notifications->newlist;
lastelt = notifications;
notifications = notifications->next;
lastelt->next = NULL;
syslock_unlock(notify_syslock);
push(args->list, lastelt, next_update_number = 0);
syslock_lock(notify_syslock);
}
else
{ /* Non-resync update */
syslock_unlock(notify_syslock);
syslock_lock(subthread_syslock);
if (notify_subthread_count >= max_subthreads)
{
system_log(LOG_DEBUG,
"update %d awaiting a thread",
next_update_number);
}
while (notify_subthread_count >= max_subthreads)
{
/* Wait for some threads to die */
syslock_signal_wait(subthread_syslock);
}
syslock_unlock(subthread_syslock);
syslock_lock(notify_syslock);
/* Lop off a bunch of notifications */
newlist = notifications;
lastelt = NULL;
while (newlist && (newlist->proc != _NI_RESYNC))
{
lastelt = newlist;
newlist = newlist->next;
}
if (lastelt)
{
lastelt->next = NULL;
fork_push(args->list, notifications, next_update_number);
}
notifications = newlist;
}
next_update_number++;
}
}
syslock_unlock(notify_syslock);
}
int
have_notifications_pending(void)
{
int num_threads;
if (notifications != NULL) {
/*
* List is not empty
*/
return (1);
}
if (!subthread_syslock) {
return 0;
}
syslock_lock(subthread_syslock);
num_threads = notify_subthread_count;
syslock_unlock(subthread_syslock);
return (num_threads != 0);
}
/*
* Notify the clone servers of a change to the database.
* proc = procedure to execute on clone
* args = arguments to procedure
* XXX: procedure is a misnomer - should be notify_clones
*/
void
notify_clients(
unsigned proc,
void *args
)
{
notify_list *l;
const xdr_table_entry *ent;
if (!have_notifier) {
if (!notify_start()) {
return;
}
}
ent = xdr_table_lookup(proc);
if (ent == NULL) {
return;
}
syslock_lock(notify_syslock);
for (l = (notify_list *)¬ifications; *l != NULL; l = &(*l)->next);
MM_ALLOC(*l);
(*l)->proc = proc;
if (proc == _NI_RESYNC)
{
(*l)->newlist = (clone_list)args;
}
else
{
bcopy(args, &(*l)->args, ent->insize);
bzero(args, ent->insize);
}
(*l)->checksum = ni_getchecksum(db_ni);
(*l)->next = NULL;
syslock_signal_send(notify_syslock);
syslock_unlock(notify_syslock);
}
/*
* Tries to execute the procedure on each of the clone servers
*/
static void
push(clone_list list, notify_list updates, long seq)
{
write_any_res res;
const xdr_table_entry *ent = NULL;
int sock;
struct sockaddr_in sin;
struct timeval tv;
CLIENT *cl;
clone_list l;
nibind_getregister_res gres;
notify_list this;
enum clnt_stat clstat;
int low_level_success; /* Did we get the low-level stuff done? */
char *msg_stage; /* Stage of operation for error message */
char *msg; /* And a message for the ages */
unsigned clones_skipped = 0; /* How many clones we skipped */
unsigned clones_current = 0; /* Clones we would have skipped */
int nprocs;
int i; /* Temporary loop counter */
nprocs = count_procs(updates);
system_log(LOG_INFO, "update %d starting: %d clone%s, %d operation%s",
seq, count_clones(), 1 == count_clones() ? "" : "s",
nprocs, 1 == nprocs ? "" : "s");
tv.tv_sec = NOTIFY_TIMEOUT;
tv.tv_usec = 0;
sin.sin_family = AF_INET;
MM_ZERO(sin.sin_zero);
for (l = list; l != NULL; l = l->next)
{
/*
* Wait for any other threads that are pushing
* earlier updates to catch up to this clone
*/
syslock_lock(l->lock);
while ((l->lastupdate < (seq - 1)) &&
(l->lastupdate != NO_SEQUENCE_NUMBER))
{
syslock_signal_wait(l->lock);
}
sin.sin_port = 0;
sin.sin_addr.s_addr = l->addr;
/*
* If this clone has failed on the push of an earlier
* sequence number, drop it from this update. It will
* have to wait until a resync to set l->lastupdate = 0.
*/
if (l->lastupdate == NO_SEQUENCE_NUMBER)
{
/*
* If a clone misses an update and then does a
* readall, it'll be marked up_to_date, and should
* then resume getting notifications.
*/
if (!l->up_to_date)
{
/* Skip this clone, indeed */
syslock_unlock(l->lock);
syslock_signal_broadcast(l->lock);
system_log(LOG_DEBUG,
"update %d: skipping %s[%s]/%s:%hu",
seq, l->name,
inet_ntoa(sin.sin_addr), l->tag,
ntohs(sin.sin_port));
clones_skipped++;
continue;
}
else
{
/* Need to include this clone */
clones_current++;
}
}
/*
* Connect to the clone
*/
low_level_success = 0; /* Assume this low-level stuff bombs */
msg_stage = "binder contacting";
if (! (udp_getregister(l->name, l->tag, sin, tv, &gres, "notify") ||
tcp_getregister(l->name, l->tag, sin, tv, &gres, "notify")))
{
msg = "couldn't getregister";
goto cleanup_locks;
}
if (gres.status != NI_OK) {
/* XXX We use NORESPONSE for more than just parent */
msg = NI_NORESPONSE == gres.status ?
"No response from clone" :
(char *)ni_error(gres.status);
goto cleanup_locks;
}
sin.sin_port =
htons(gres.nibind_getregister_res_u.addrs.tcp_port);
msg_stage = "contacting";
msg = "Couldn't get system resources ("
"socket or TCP connection)";
sock = socket_connect(&sin, NI_PROG, NI_VERS);
if (sock < 0) {
goto cleanup_locks;
}
FD_SET(sock, &clnt_fdset); /* protect client socket */
cl = clnttcp_create(&sin, NI_PROG, NI_VERS, &sock, 0, 0);
if (cl == NULL) {
socket_close(sock);
FD_CLR(sock, &clnt_fdset); /* unprotect client socket */
msg = clnt_spcreateerror("netinfo daemon");
goto cleanup_locks;
}
/*
* Push coalesed changes to the clone
*/
low_level_success = 1;
for (this = updates, i = 0; this; this = this->next, i++) {
ent = xdr_table_lookup(this->proc);
if (ent == NULL) {
abort(); /* Should never happen */
}
bzero(&res, ent->outsize);
if (takes_arg(this->proc)) {
system_log(LOG_DEBUG,
0 == ((nprocs - i) % 50) ?
"update %d: %s %u to %s[%s]/%s:%hu {%u}, "
"%d procs to go" :
"update %d: %s %u to %s[%s]/%s:%hu {%u}",
seq, procname(this->proc),
/* Next line just grabs first arg */
this->args.create_args.id.nii_object,
l->name, inet_ntoa(sin.sin_addr), l->tag,
ntohs(sin.sin_port), this->checksum,
nprocs - i);
} else {
system_log(LOG_DEBUG,
0 == ((nprocs - i) % 50) ?
"update %d: %s to %s[%s]/%s:%hu {%u}, "
"%d procs to go" :
"update %d: %s to %s[%s]/%s:%hu {%u}",
seq, procname(this->proc), l->name,
inet_ntoa(sin.sin_addr), l->tag,
ntohs(sin.sin_port), this->checksum,
nprocs - i);
}
clstat = clnt_call(cl, this->proc, ent->xdr_in,
&this->args, ent->xdr_out, &res, tv);
if (clstat != RPC_SUCCESS) {
struct sockaddr_in us;
int count;
count = sizeof(us);
if (0 != getsockname(sock, (struct sockaddr *)&us,
&count)) {
us.sin_port = htons(-1);
}
system_log(LOG_ERR,
"update %d: %s from port %hu to %s[%s]/%s:%hu "
"RPC error - %s", seq,
procname(this->proc), ntohs(us.sin_port),
l->name, inet_ntoa(sin.sin_addr), l->tag,
ntohs(sin.sin_port), clnt_sperrno(clstat));
clnt_destroy_lock(cl, sock);
FD_CLR(sock, &clnt_fdset); /* unprotect client socket */
goto cleanup_locks; /* Shouldn't do others */
}
/*
* Next line uses the write_res part of the union
* just 'cuz. No, it's probably not a write, but
* it doesn't really matter: all the constituent types
* of the union have status as their first member.
*/
if (res.write_res.status != NI_OK) {
struct sockaddr_in us;
int count;
count = sizeof(us);
if (0 != getsockname(sock, (struct sockaddr *)&us,
&count)) {
us.sin_port = htons(-1);
}
system_log(LOG_ERR,
"update %d: %s from port %hu to %s[%s]/%s:%hu "
"failed - %s", seq,
procname(this->proc), ntohs(us.sin_port),
l->name, inet_ntoa(sin.sin_addr), l->tag,
ntohs(sin.sin_port),
ni_error(res.write_res.status));
clnt_destroy_lock(cl, sock);
FD_CLR(sock, &clnt_fdset); /* unprotect client socket */
goto cleanup_locks; /* Shouldn't do others */
}
}
/*
* Push was successful - reset this clone's sequence number
*/
clnt_destroy_lock(cl, sock);
FD_CLR(sock, &clnt_fdset); /* unprotect client socket */
l->lastupdate = seq;
l->up_to_date = TRUE;
syslock_unlock(l->lock);
syslock_signal_broadcast(l->lock);
continue;
/*
* Push failed - give up on this clone
*/
cleanup_locks:
l->lastupdate = NO_SEQUENCE_NUMBER;
l->up_to_date = FALSE;
syslock_unlock(l->lock);
syslock_signal_broadcast(l->lock);
if (!low_level_success)
{
system_log(LOG_ERR,
"update %d to %s[%s]/%s failed during %s - "
"%s", seq,
l->name, inet_ntoa(sin.sin_addr), l->tag,
msg_stage, msg);
}
}
/*
* Clean up
*/
while (updates)
{
ent = xdr_table_lookup(updates->proc);
xdr_free(ent->xdr_in, (void *)&updates->args);
this = updates->next;
MM_FREE(updates);
updates = this;
}
if (0 == clones_skipped && 0 == clones_current)
{
system_log(LOG_INFO, "update %d sent", seq);
}
else if (0 == clones_skipped && 0 != clones_current)
{
system_log(LOG_INFO,
"update %d sent; %d clone%s included per TS enhancement",
seq, clones_current, (1 == clones_current ? "" : "s"));
}
else if (0 != clones_skipped && 0 == clones_current)
{
system_log(LOG_INFO, "update %d sent; %d clone%s skipped", seq,
clones_skipped, (1 == clones_skipped ? "" : "s"));
}
else
{ /* 0 != clones_current && 0 != clones_skipped */
system_log(LOG_INFO, "update %d sent; %d clone%s skipped, "
"%d clone%s included per TS enhancement", seq,
clones_skipped, (1 == clones_skipped ? "" : "s"),
clones_current, (1 == clones_current ? "" : "s"));
}
}
/*
* Do a push in a separate thread
*/
static void
fork_push(clone_list clones, notify_list updates, long seq)
{
fork_push_args *args;
char buf[64];
systhread *t;
if (!(args = (fork_push_args *)malloc(sizeof(fork_push_args)))) return;
args->updates = updates;
args->clones = clones;
args->seqno = seq;
syslock_lock(subthread_syslock);
t = systhread_new();
sprintf(buf, "notify-%ld", seq);
systhread_set_name(t, buf);
systhread_run(t, (void(*)(void *))push_thread_stub, (void *)args);
notify_subthread_count++;
syslock_unlock(subthread_syslock);
return;
}
static void
push_thread_stub(fork_push_args *args)
{
push(args->clones, args->updates, args->seqno);
MM_FREE(args);
syslock_lock(subthread_syslock);
notify_subthread_count--;
syslock_signal_send(subthread_syslock);
syslock_unlock(subthread_syslock);
systhread_exit();
}
/*
* Looks up the XDR information for the given procedure
*/
static const xdr_table_entry *
xdr_table_lookup(unsigned proc)
{
int i;
for (i = 0; i < XDR_TABLE_SIZE; i++)
{
if (xdr_table[i].proc == proc) return (&xdr_table[i]);
}
return NULL;
}
const char *
procname(int procnum)
{
static char *procname_array[] = {
"ping",
"statistics",
"root",
"self",
"parent",
"create",
"destroy",
"read",
"write",
"children",
"lookup",
"list",
"createprop",
"destroyprop",
"readprop",
"writeprop",
"renameprop",
"listprops",
"createname",
"destroyname",
"readname",
"writename",
"rparent",
"listall",
"bind",
"readall",
"crashed",
"resync",
"lookupread"};
return(procnum < (sizeof(procname_array) / sizeof(*procname_array)) ?
procname_array[procnum] :
"*UNKNOWN*");
}
int
notifications_pending(void)
{
int i;
notify_list n;
/* If lock is unitialized, nothing's pending */
if (notify_syslock == NULL) return 0;
syslock_lock(notify_syslock);
for (i = 0, n = notifications; NULL != n; n = n->next) i++;
syslock_unlock(notify_syslock);
return i;
}
/* Encapsulate the implementations, rather than making the variables visible */
int
count_notify_subthreads(void)
{
return(notify_subthread_count);
}
int
count_clones(void)
{
return(clone_count);
}
const bool_t
takes_arg(int procnum)
{
switch (procnum)
{
case _NI_RESYNC:
case _NI_RPARENT:
case _NI_ROOT:
case _NI_STATISTICS:
case _NI_PING: return FALSE;
}
return TRUE;
}
unsigned
count_procs(notify_list updates)
{
int i;
notify_list n;
for (i = 0, n = updates; n != NULL; n = n->next) i++;
return i;
}
bool_t
udp_getregister(ni_name name, ni_name tag, struct sockaddr_in sin,
struct timeval tv, nibind_getregister_res *res, char *caller)
{
int sock;
CLIENT *cl;
enum clnt_stat clstat;
char *msg_stage;
char *msg;
sin.sin_port = 0;
msg_stage = "opening (UDP)";
sock = socket_open(&sin, NIBIND_PROG, NIBIND_VERS);
if (sock < 0)
{
msg = "socket_open failed";
goto failed;
}
msg_stage = "creating (UDP)";
FD_SET(sock, &clnt_fdset); /* protect client socket */
cl = clntudp_create(&sin, NIBIND_PROG, NIBIND_VERS, tv, &sock);
if (cl == NULL)
{
msg = clnt_spcreateerror("binder daemon");
socket_close(sock);
FD_CLR(sock, &clnt_fdset); /* unprotect client socket */
goto failed;
}
msg_stage = "connecting (UDP)";
if ((clstat = clnt_call(cl, NIBIND_GETREGISTER,
xdr_ni_name, &tag,
xdr_nibind_getregister_res,
res, tv)) != RPC_SUCCESS)
{
msg = clnt_sperrno(clstat);
clnt_destroy_lock(cl, sock);
FD_CLR(sock, &clnt_fdset); /* unprotect client socket */
goto failed;
}
clnt_destroy_lock(cl, sock);
FD_CLR(sock, &clnt_fdset); /* unprotect client socket */
return(TRUE);
failed:
system_log(LOG_WARNING, "%s's udp binder connection to %s[%s]/%s "
"failed during %s - %s", caller, name, inet_ntoa(sin.sin_addr),
tag, msg_stage, msg);
return(FALSE);
}
bool_t
tcp_getregister(ni_name name, ni_name tag, struct sockaddr_in sin,
struct timeval tv, nibind_getregister_res *res, char *caller)
{
int sock;
CLIENT *cl;
enum clnt_stat clstat;
char *msg_stage;
char *msg;
sin.sin_port = 0;
msg_stage = "opening (TCP)";
sock = socket_connect(&sin, NIBIND_PROG, NIBIND_VERS);
if (sock < 0)
{
msg = "socket_connect failed";
goto failed;
}
msg_stage = "creating (TCP)";
FD_SET(sock, &clnt_fdset); /* protect client socket */
cl = clnttcp_create(&sin, NIBIND_PROG, NIBIND_VERS, &sock, 0, 0);
if (cl == NULL)
{
msg = clnt_spcreateerror("binder daemon");
socket_close(sock);
FD_CLR(sock, &clnt_fdset); /* unprotect client socket */
goto failed;
}
msg_stage = "connecting (TCP)";
if ((clstat = clnt_call(cl, NIBIND_GETREGISTER,
xdr_ni_name, &tag, xdr_nibind_getregister_res,
res, tv)) != RPC_SUCCESS)
{
msg = clnt_sperrno(clstat);
clnt_destroy_lock(cl, sock);
FD_CLR(sock, &clnt_fdset); /* unprotect client socket */
goto failed;
}
clnt_destroy_lock(cl, sock);
FD_CLR(sock, &clnt_fdset); /* unprotect client socket */
return(TRUE);
failed:
system_log(LOG_WARNING, "%s's tcp binder connection to %s[%s]/%s "
"failed during %s - %s", caller, name, inet_ntoa(sin.sin_addr),
tag, msg_stage, msg);
return(FALSE);
}
/*
* When a clone completes a readall successfully, we should note that
* it's current (i.e., up_to_date), so that additional updates that
* arrive before the next resync cleanup time are propagated.
*/
void
notify_mark_clone(const unsigned long addr)
{
clone_list clist;
clone_list clone;
clone_list found_clone = NULL;
struct in_addr t_addr;
/*
* Find the clone that did the readall. Mark it as current.
* XXX If there are multiple interfaces for this clone's host,
* and the readall came from an interface not listed as running
* a clone (in the master's /machines), then it's not on our
* clone list, and it won't be marked as up_to_date. If there
* are multiple clones running on one host, we haven't enough
* information to distinguish among them: all we have is the
* address of the requesting clone, not its tag. So, we punt
* on that one, too. XXX
*/
clist = clonelist_retain(global_clone_list);
for (clone = clist; clone != NULL; clone = clone->next)
{
/* Search for the clone. */
if (addr == clone->addr)
{
if (NULL == found_clone)
{
found_clone = clone;
}
else
{
/* Already have one! */
t_addr.s_addr = addr;
system_log(LOG_DEBUG,
"Multiple clones at %s[%s]; can't mark up_to_date",
clone->name, inet_ntoa(t_addr));
clonelist_release(clist);
return;
}
}
}
if (NULL == found_clone)
{
t_addr.s_addr = addr;
system_log(LOG_INFO,
"readall completed from unregistered clone at %s",
inet_ntoa(t_addr));
clonelist_release(clist);
return;
}
/* Found exactly one clone for this address. Mark it up_to_date. */
t_addr.s_addr = addr;
syslock_lock(found_clone->lock);
found_clone->up_to_date = TRUE;
system_log(LOG_INFO, "clone %s[%s]/%s marked current; was%s being skipped",
found_clone->name, inet_ntoa(t_addr), found_clone->tag,
NO_SEQUENCE_NUMBER == found_clone->lastupdate ? "" : "n't");
syslock_unlock(found_clone->lock);
clonelist_release(clist);
return;
}
syntax highlighted by Code2HTML, v. 0.9.1