#
# Copyright (C) 2006 SIPfoundry Inc.
# Licensed by SIPfoundry under the LGPL license.
#
# Copyright (C) 2006 Pingtel Corp.
# Licensed to SIPfoundry under a Contributor Agreement.
#
##############################################################################
# system requires
require 'rubygems' # Ruby packaging and installation framework
require_gem 'activerecord' # object-relational mapping layer for Rails
require 'logger' # writes log messages to a file or stream
require 'observer' # for Observer pattern a.k.a Publish/Subscribe
# set up the load path
thisdir = File.dirname(__FILE__)
$:.unshift(thisdir)
$:.unshift(File.join(thisdir, "app", "models"))
# application requires
require 'call_resolver_configure'
require 'call_state_event'
require 'cdr'
require 'configure'
require 'database_url'
require 'database_utils'
require 'exceptions'
# :TODO: log the number of calls analyzed and how many succeeded vs. dups or
# failures, also termination status
# The CallResolver analyzes call state events (CSEs) and computes call detail
# records (CDRs). It loads CSEs from a database and writes CDRs back into the
# same database.
class CallResolver
include Observable # so we can notify Call Resolver plugins of events
# Constants
LOCALHOST = 'localhost'
# How many seconds are there in a day
SECONDS_IN_A_DAY = 86400
# Names of events that we send to plugins.
EVENT_NEW_CDR = 'a new CDR has been created in the database'
# SQL command to garbage-collect and analyze a PostgreSQL database
POSTGRESQL_TUNE_UP_DATABASE = "VACUUM ANALYZE"
public
# Methods
def initialize(config_file = nil)
@config = CallResolverConfigure.new(config_file)
end
# Run daily processing, including purging and/or call resolution
def daily_run(purge_flag = false, purge_time = 0)
run_resolver = true
do_purge = config.purge?
connect_to_cdr_database
if config.daily_run?
# Get start time and end time
start_time = get_daily_start_time
end_time = Time.now()
log.debug{"Daily run - resolve start: #{start_time}, end: #{end_time}"}
else
log.error("resolve: the --daily_flag is set, but the daily run is disabled in the configuration");
run_resolver = false
end
# Resolve CDRs if enabled
if run_resolver
resolve(start_time, end_time)
end
# Purge if enabled
if do_purge || purge_flag
# Was a purge age explicitly set?
if purge_time != 0
purge_start_cdr = Time.now() - (SECONDS_IN_A_DAY * purge_time)
purge_start_cse = purge_start_cdr
log.info("Purge override CSEs: #{purge_start_cse}, CDRs: #{purge_start_cdr}")
else
purge_start_cdr = config.purge_start_time_cdr
purge_start_cse = config.purge_start_time_cse
log.info("Normal purge CSEs: #{purge_start_cse}, CDRs: #{purge_start_cdr}")
end
purge(purge_start_cse, purge_start_cdr)
end
end
# Resolve CSEs to CDRs
def resolve(start_time, end_time)
connect_to_cdr_database
begin
Cdr.transaction do
# Default the start time to 1 day before now
start_time ||= Time.now() - SECONDS_IN_A_DAY
# Default the end_time to 1 day after the start time
end_time ||= start_time + SECONDS_IN_A_DAY
start_run = Time.now
log.info("resolve: Resolving calls from #{start_time.to_s} to " +
"#{end_time.to_s}. Running at #{start_run}.")
# Load all CSEs in the time window. The call_map is a hash where each key
# is a call ID and the value is an array of events for that call ID,
# sorted by time.
# :TODO: For performance/scalability (XPR-144) we can't just load all the
# data at once. Split the time window into subwindows and do one
# subwindow at a time, noting incomplete CDRs and carrying those calls
# forward into the next subwindow.
call_map = load_distrib_events_in_time_window(start_time, end_time)
# Resolve each call to yield 0-1 CDRs. Save the CDRs.
call_map.each_value do |call|
resolve_call(call)
end
end_run = Time.now
log.info("resolve: Done at #{end_run}. Analysis took #{end_run - start_run} seconds.")
end
rescue
# Backstop exception handler: don't let any exceptions propagate back
# to the caller. Log the error and the stack trace. The log message has to
# go on a single line, unfortunately. Embed "\n" for syslogviewer.
start_line = "\n from " # start each backtrace line with this
log.error("Exiting because of error: \"#{$!}\"" + start_line +
$!.backtrace.inject{|trace, line| trace + start_line + line})
end
end
# Give public access to the config
attr_reader :config
private
# For each config param, define an accessor that forwards to the config.
CallResolverConfigure::ALL_PARAMS.each do |sym|
self.class_eval(%{
def #{sym}
config.#{sym}
end
})
end
# Allow other components to use the Call Resolver log, since all logging
# should go to a single shared file. We have defined the readers above, make
# them public.
public(:log, :log_device)
# Connect to the CDR database, if not already connected. The CDR database
# is the default database for all models. Because there is only one CDR
# database, the caller doesn't need to provide an URL.
def connect_to_cdr_database
unless ActiveRecord::Base.connected?
ActiveRecord::Base.establish_connection(cdr_database_url.to_hash)
end
end
# Connect the CallStateEvent class to the CSE database at the specified URL.
# With HA there are multiple CSE databases.
def connect_to_cse_database(db_url)
log.debug{"connect_to_cse_database: #{db_url}"}
CallStateEvent.establish_connection(db_url.to_hash)
end
# Resolve the call, given an events array for that call, to yield 0-1 CDRs.
# Persist the CDRs. Do this as a single transaction.
def resolve_call(events)
call_id = events[0].call_id
log.debug {
# Build a string listing the event types
event_types_str =
events.inject('') {|msg, event| msg + event.event_type + ' '}
event_types_str[-1] = ')'
"Resolving a call: call ID = #{call_id} with #{events.length} events (" + event_types_str}
begin
call_req = find_call_request(events)
if call_req
# Read info from the call request and start constructing the CDR.
cdr = start_cdr(call_req)
# The forking proxy might ring multiple phones. The dialog with each
# phone is a separate call leg.
# Pick the call leg with the best outcome and longest duration to be the
# basis for the CDR.
to_tag = best_call_leg(events)
if to_tag # if there are any call legs worth examining
# Fill the CDR from the call leg events. The returned status is true
# if that succeeded, false otherwise.
status = finish_cdr(cdr, events, to_tag)
if status
log.debug do
"resolve_call: Resolved a call from #{cdr.caller_aor} to " +
"#{cdr.callee_aor} at #{cdr.start_time}, call status = " +
"#{cdr.termination_text}"
end
save_cdr(cdr)
end
else
log.debug {"resolve_call: no good call legs found, discarding this call"}
end
end
# Don't let a single bad call blow up the whole run, if the exception was
# raised by our code. If it's an exception that we don't know about, then
# bail out.
rescue CallResolverException
log.error("resolve_call: error with call ID = \"#{call_id}\", no CDR created: #{$!}")
end
end
# Load all events in the time window from HA distributed servers.
# Return a hash where the key is a call ID and values
# are event arrays for that call ID, sorted by time.
def load_distrib_events_in_time_window(start_time, end_time)
call_map = {}
# all_calls holds arrays, one array of event subarrays for each database.
# Each subarray holds events for one call, sorted by time.
all_calls = []
cse_database_urls.each do |db_url|
connect_to_cse_database(db_url)
# Load events from this database
events = []
begin
events = load_events_in_time_window(start_time, end_time)
log.debug{"load_distrib_events_in_time_window: loaded #{events.length} " +
"events from #{db_url}"}
rescue
log.error("load_distrib_events_in_time_window: Unable to load events " +
"from the database #{db_url}. The error was: \"#{$!}\".")
end
# Divide the events into subarrays, one for each call. Save the result.
if events.length > 0
calls = split_events_by_call(events)
all_calls << calls
end
end
# Put the event arrays in the hash table, merging on collisions
merge_events_for_call(all_calls, call_map)
call_map
end
# Put call event arrays in the hash table, merging on collisions. The
# all_calls arg is an array of arrays, one for each CSE database. Each array
# contains event arrays, where each event array is for a single call.
def merge_events_for_call(all_calls, call_map)
all_calls.each do |calls|
calls.each do |call|
call_id = call[0].call_id
# If there is a hash entry already, then merge this partial call into
# it, keeping the time sort. Otherwise create a new hash entry.
# Typically each call comes completely from one server, but that is not
# always true.
entry = call_map[call_id]
if entry
# merge the events together and re-sort by time
entry += call
entry.sort!{|x, y| x.event_time <=> y.event_time}
# not sure why I have to do this -- entry has been modified in place,
# yes? -- but otherwise the hash entry doesn't get updated correctly
call_map[call_id] = entry
else
# create a new hash entry
call_map[call_id] = call
end
end
end
end
# Given an events array where the events for a given call are contiguous,
# split the array into subarrays, one for each call. Return the array of
# subarrays.
def split_events_by_call(events)
calls = []
call_start = 0 # index of first event in the call
while call_start < events.length
call_id = events[call_start].call_id # call ID of current call
# Look for the next event with a different call ID. If we don't find
# one, then the current call goes to the end of the events array.
call_end = events.length - 1
events[call_start + 1..-1].each_with_index do |event, index|
if event.call_id != call_id
call_end = index + call_start
break
end
end
# Add the subarray to the calls array
calls << events[call_start..call_end]
# On to the next call
call_start = call_end + 1
end
calls
end
# Load all events in the time window, sorted by call ID then by time.
# Return the events in an array.
def load_events_in_time_window(start_time, end_time)
events =
CallStateEvent.find(
:all,
:conditions => "event_time >= '#{start_time}' and event_time < '#{end_time}'",
:order => "call_id, event_time")
log.debug{"load_events: loaded #{events.length} events between #{start_time} and #{end_time}"}
events
end
# Finds the last generated CDR (if there is one) and returns the
# start time. If there is no CDR the return now - 24 hrs.
def get_daily_start_time
event = Cdr.find(
:first,
:order => "start_time DESC")
if event == nil
start_time = Time.now() - SECONDS_IN_A_DAY
else
start_time = event.start_time
end
start_time
end
# Load all events with the given call_id, in ascending chronological order.
def load_events_with_call_id(call_id)
events =
CallStateEvent.find(
:all,
:conditions => "call_id = '#{call_id}'",
:order => "event_time")
log.debug {"load_events: loaded #{events.length} events with call_id = #{call_id}"}
events
end
# Load the call IDs for the specified time window.
def load_call_ids(start_time, end_time)
# This query returns an array of objects of class CallStateEvent, but they
# only have the call_id attribute defined. Convert to an array of strings
# and return that.
results = CallStateEvent.find_by_sql(
"select distinct call_id from call_state_events " +
"where event_time >= '#{start_time}' and event_time <= '#{end_time}'")
results.collect do |obj|
obj.call_id
end
end
# Given an events array sorted by time, find the call request event, which
# represents a SIP INVITE message. Return nil if there are no call requests.
# If there are multiple requests, then pick original INVITEs (no to_tag).
# If there's more than one of those, then pick the earliest.
# Note that in the HA case events may be logged on multiple machines, with
# imperfectly synchronized clocks.
def find_call_request(events)
call_request = nil # return value
m = 'find_call_request' # method name, for debug logging
# Find call requests
requests = events.find_all {|event| event.call_request?}
if requests.length == 1
call_request = requests[0]
log.debug {"#{m}: found one call request: #{call_request.to_s}"}
elsif requests.length == 0
log.debug {"#{m}: found no call requests"}
else # requests.length > 1
log.debug {
"#{m}: found #{requests.length} call requests: " +
Utils.events_to_s(requests)}
# Among the call requests, find original INVITEs
originals = requests.find_all {|event| !event.to_tag}
if originals.length == 1
call_request = originals[0]
log.debug{"#{m}: found one original call request: #{call_request.to_s}"}
elsif originals.length == 0
call_request = requests[0]
log.debug{"#{m}: found no original call requests, " +
"use the earliest non-original request: #{call_request.to_s}"}
else # originals.length > 1
call_request = originals[0]
log.debug {
"#{m}: found #{originals.length} original call requests: " +
Utils.events_to_s(originals) +
"\nuse the earliest: #{call_request.to_s}"}
end
end
call_request
end
# Read info from the call request event and start constructing the CDR.
# Return the new CDR.
def start_cdr(call_req)
cdr = Cdr.new(:call_id => call_req.call_id,
:from_tag => call_req.from_tag,
:caller_aor => call_req.caller_aor,
:callee_aor => call_req.callee_aor,
:start_time => call_req.event_time,
:termination => Cdr::CALL_REQUESTED_TERM)
cdr.caller_contact = Utils.contact_without_params(call_req.contact)
cdr
end
# Pick the call leg with the best outcome and longest duration to be the
# basis for the CDR. Return the to_tag for that call leg. A call leg is a
# set of events with a common to_tag.
# Return nil if there is no such call leg.
def best_call_leg(events) # array of events with a given call ID
to_tag = nil # result: the to_tag for the best call leg
# Is there at least one call_end event?
has_call_end = !events.any? {|event| event.call_end?}
# Find the call leg with the best outcome and longest duration. If there is
# a call_end event, then look for the call end event with the biggest
# timestamp. Otherwise look for the call failure event with the biggest
# timestamp. Events have already been sorted for us in timestamp order.
final_event_type = has_call_end ?
CallStateEvent::CALL_FAILURE_TYPE :
CallStateEvent::CALL_END_TYPE
events.reverse_each do |event|
if event.event_type == final_event_type
to_tag = event.to_tag
log.debug {"best_call_leg: to_tag is #{to_tag} for final event #{event.to_s}"}
break
end
end
if !to_tag
# If there is no final event, then try to at least find a call_setup event
events.reverse_each do |event|
if event.call_setup?
to_tag = event.to_tag
log.debug {"best_call_leg: to_tag is #{to_tag} for setup event #{event.to_s}"}
break
end
end
end
if !to_tag
log.debug {"best_call_leg: could not find a final event or a setup event"}
end
to_tag
end
# Fill in the CDR from a call leg consisting of the events with the given
# to_tag. Return true if successful, false otherwise.
def finish_cdr(cdr, events, to_tag)
status = false # return value: did we fill in the CDR?
# Get the events for the call leg
call_leg = events.find_all {|event| event.to_tag == to_tag or
event.from_tag == to_tag}
# Find the call_setup event
call_setup = call_leg.find {|event| event.call_setup?}
if call_setup
# The call was set up, so mark it provisionally as in progress.
cdr.termination = Cdr::CALL_IN_PROGRESS_TERM
# We have enough data now to build the CDR.
status = true
# Get data from the call_setup event
cdr.callee_contact = Utils.contact_without_params(call_setup.contact)
cdr.to_tag = call_setup.to_tag
cdr.connect_time = call_setup.event_time
# Get data from the call_end or call_failure event
call_end = call_leg.find {|event| event.call_end?}
if call_end
cdr.termination = Cdr::CALL_COMPLETED_TERM # successful completion
cdr.end_time = call_end.event_time
else
# Couldn't find a call_end event, try for call_failure
status = handle_call_failure(call_leg, cdr)
end
else
# No call_setup event, so look for a call_failure event
status = handle_call_failure(call_leg, cdr)
end
status
end
# Look for a call_failure event in the call_leg events array. If we find one,
# then fill in CDR info and return success. Otherwise return failure.
def handle_call_failure(call_leg, cdr)
status = false
call_failure = call_leg.find {|event| event.call_failure?}
if call_failure
# found a call_failure event, use it
# :TODO: consider optimizing space usage by not setting the
# failure_reason if it is the default value for the failure_status
# code. For example, the 486 error has the default reason "Busy Here".
cdr.to_tag = call_failure.to_tag # may already be filled in from call_setup
cdr.termination = Cdr::CALL_FAILED_TERM
cdr.end_time = call_failure.event_time
cdr.failure_status = call_failure.failure_status
cdr.failure_reason = call_failure.failure_reason
status = true
end
status
end
# Save the CDR. Be sure not to create a duplicate CDR for the same dialog.
# (:TODO: is there a way to avoid querying for the existence of a prior CDR
# here? Or maybe we could query just by call_id first and index that column?)
# Raise a CallResolverException if the save fails for some reason.
#
# Assume that this is the only process/thread creating CDRs for these calls.
# Therefore we don't have to worry about the race condition where after we
# check for the existence of the CDR and before we save it, another thread
# saves a CDR with the same call ID.
def save_cdr(cdr)
# Continue only if a complete CDR doesn't already exist
db_cdr = find_cdr(cdr)
# If we found an existing CDR, then log that for debugging
if db_cdr
log.debug do
"save_cdr: found an existing CDR. Call ID = #{db_cdr.call_id}, " +
"termination = #{db_cdr.termination}, ID = #{db_cdr.id}."
end
end
# Save the CDR as long as there is no existing, complete CDR for that call ID.
if (!db_cdr or !db_cdr.complete?)
# Call the Observable method indicating a state change.
changed
# Notify plugins of the new CDR.
# "notify_observers" is a method of the Observable module, which is mixed
# in to the CallResolver.
notify_observers(EVENT_NEW_CDR, # event type
cdr) # the new CDR
# Save the CDR. If there is an incomplete CDR already, then replace it.
# Call "save!" rather than "save" so that we'll get an exception if the
# save fails.
# :TODO: Perhaps just copy values into the CDR in the DB and update it,
# rather than doing a delete/insert.
if db_cdr
db_cdr.destroy
end
if cdr.save!
db_cdr = cdr
else
Utils.raise_exception('save_helper_cdr: save failed')
end
end
db_cdr
end
# Given an in-memory CDR, find that CDR in the database and return it.
# If the CDR is not in the database, then return nil.
def find_cdr(cdr)
if !cdr.call_id
Utils.raise_exception('find_cdr: call_id is nil', ArgumentError)
end
Cdr.find(
:first,
:conditions => "call_id = '#{cdr.call_id}'")
end
# Purge records from call state event and cdr tables, making sure
# to delete unreferenced entries in the parties table.
def purge(start_time_cse, start_time_cdr)
connect_to_cdr_database
# Purge CSEs
CallStateEvent.transaction do
log.debug{"purge: purging CSEs where event_time <= #{start_time_cse}"}
CallStateEvent.delete_all(["event_time <= '#{start_time_cse}'"])
end
# Purge CDRs
Cdr.transaction do
log.debug{"purge: purging CDRs where event_time <= #{start_time_cdr}"}
Cdr.delete_all(["end_time <= '#{start_time_cdr}'"])
end
# Garbage-collect deleted records and tune performance.
# See http://www.postgresql.org/docs/8.0/static/sql-vacuum.html .
# :TODO: For HA, purge multiple databases, both CSE and CDR
# Must be done outside of a transaction or we'll get an error.
ActiveRecord::Base.connection.execute(POSTGRESQL_TUNE_UP_DATABASE)
end
end # class CallResolver
syntax highlighted by Code2HTML, v. 0.9.1