#
# Copyright (C) 2006 SIPfoundry Inc.
# Licensed by SIPfoundry under the LGPL license.
# 
# Copyright (C) 2006 Pingtel Corp.
# Licensed to SIPfoundry under a Contributor Agreement.
#
##############################################################################

# system requires
require 'rubygems'            # Ruby packaging and installation framework
require_gem 'activerecord'    # object-relational mapping layer for Rails
require 'logger'              # writes log messages to a file or stream
require 'observer'            # for Observer pattern a.k.a Publish/Subscribe

# set up the load path
thisdir = File.dirname(__FILE__)
$:.unshift(thisdir)
$:.unshift(File.join(thisdir, "app", "models"))

# application requires
require 'call_resolver_configure'
require 'call_state_event'
require 'cdr'
require 'configure'
require 'database_url'
require 'database_utils'
require 'exceptions'


# :TODO: log the number of calls analyzed and how many succeeded vs. dups or
#        failures, also termination status

# The CallResolver analyzes call state events (CSEs) and computes call detail 
# records (CDRs).  It loads CSEs from a database and writes CDRs back into the
# same database.
class CallResolver
  include Observable    # so we can notify Call Resolver plugins of events

  # Constants

  LOCALHOST = 'localhost'

  # How many seconds are there in a day
  SECONDS_IN_A_DAY = 86400

  # Names of events that we send to plugins.
  EVENT_NEW_CDR = 'a new CDR has been created in the database'
  
  # SQL command to garbage-collect and analyze a PostgreSQL database
  POSTGRESQL_TUNE_UP_DATABASE = "VACUUM ANALYZE"

public

  # Methods

  def initialize(config_file = nil)
    @config = CallResolverConfigure.new(config_file)    
  end

  # Run daily processing, including purging and/or call resolution
  def daily_run(purge_flag = false, purge_time = 0)
    run_resolver = true
    do_purge = config.purge?
    
    connect_to_cdr_database
        
    if config.daily_run?
      # Get start time and end time
      start_time = get_daily_start_time      
      end_time = Time.now()
      log.debug{"Daily run - resolve start: #{start_time}, end: #{end_time}"}
    else
      log.error("resolve: the --daily_flag is set, but the daily run is disabled in the configuration");
      run_resolver = false
    end
    
    # Resolve CDRs if enabled
    if run_resolver
      resolve(start_time, end_time)
    end
        
    # Purge if enabled
    if do_purge || purge_flag
      # Was a purge age explicitly set?
      if purge_time != 0
        purge_start_cdr = Time.now() - (SECONDS_IN_A_DAY * purge_time)
        purge_start_cse = purge_start_cdr         
        log.info("Purge override CSEs: #{purge_start_cse}, CDRs: #{purge_start_cdr}")
      else
        purge_start_cdr = config.purge_start_time_cdr
        purge_start_cse = config.purge_start_time_cse
        log.info("Normal purge CSEs: #{purge_start_cse}, CDRs: #{purge_start_cdr}")
      end  
      purge(purge_start_cse, purge_start_cdr)        
    end
  end

  # Resolve CSEs to CDRs
  def resolve(start_time, end_time)
    connect_to_cdr_database
    
    begin
      Cdr.transaction do
        # Default the start time to 1 day before now
        start_time ||= Time.now() - SECONDS_IN_A_DAY
        
        # Default the end_time to 1 day after the start time
        end_time ||= start_time + SECONDS_IN_A_DAY
    
        start_run = Time.now
        log.info("resolve: Resolving calls from #{start_time.to_s} to " +
                 "#{end_time.to_s}.  Running at #{start_run}.")
  
        # Load all CSEs in the time window.  The call_map is a hash where each key
        # is a call ID and the value is an array of events for that call ID,
        # sorted by time.
        # :TODO: For performance/scalability (XPR-144) we can't just load all the
        # data at once.  Split the time window into subwindows and do one
        # subwindow at a time, noting incomplete CDRs and carrying those calls
        # forward into the next subwindow.
        call_map = load_distrib_events_in_time_window(start_time, end_time)
        
        # Resolve each call to yield 0-1 CDRs.  Save the CDRs.
        call_map.each_value do |call|
          resolve_call(call)
        end
  
        end_run = Time.now
        log.info("resolve: Done at #{end_run}.  Analysis took #{end_run - start_run} seconds.")
      end
    rescue
      # Backstop exception handler: don't let any exceptions propagate back
      # to the caller.  Log the error and the stack trace.  The log message has to
      # go on a single line, unfortunately.  Embed "\n" for syslogviewer.
      start_line = "\n        from "    # start each backtrace line with this
      log.error("Exiting because of error: \"#{$!}\"" + start_line +
                $!.backtrace.inject{|trace, line| trace + start_line + line})
    end
  end

  # Give public access to the config
  attr_reader :config

private

  # For each config param, define an accessor that forwards to the config.
  CallResolverConfigure::ALL_PARAMS.each do |sym|
    self.class_eval(%{
      def #{sym}
        config.#{sym}
      end
    })
  end
  
  # Allow other components to use the Call Resolver log, since all logging
  # should go to a single shared file. We have defined the readers above, make
  # them public. 
  public(:log, :log_device)

  # Connect to the CDR database, if not already connected.  The CDR database
  # is the default database for all models.  Because there is only one CDR
  # database, the caller doesn't need to provide an URL.
  def connect_to_cdr_database
    unless ActiveRecord::Base.connected?
      ActiveRecord::Base.establish_connection(cdr_database_url.to_hash)
    end
  end

  # Connect the CallStateEvent class to the CSE database at the specified URL.
  # With HA there are multiple CSE databases.
  def connect_to_cse_database(db_url)    
    log.debug{"connect_to_cse_database: #{db_url}"}
    CallStateEvent.establish_connection(db_url.to_hash)
  end

  # Resolve the call, given an events array for that call, to yield 0-1 CDRs.
  # Persist the CDRs.  Do this as a single transaction.
  def resolve_call(events)
    call_id = events[0].call_id
    log.debug {
      # Build a string listing the event types
      event_types_str =
        events.inject('') {|msg, event| msg + event.event_type + ' '}
      event_types_str[-1] = ')'
      "Resolving a call: call ID = #{call_id} with #{events.length} events (" + event_types_str}
    
    begin
      call_req = find_call_request(events)
      if call_req
        # Read info from the call request and start constructing the CDR.
        cdr = start_cdr(call_req)
        
        # The forking proxy might ring multiple phones.  The dialog with each
        # phone is a separate call leg.
        # Pick the call leg with the best outcome and longest duration to be the
        # basis for the CDR.
        to_tag = best_call_leg(events)

        if to_tag                 # if there are any call legs worth examining
          # Fill the CDR from the call leg events.  The returned status is true
          # if that succeeded, false otherwise.
          status = finish_cdr(cdr, events, to_tag)
        
          if status
            log.debug do
              "resolve_call: Resolved a call from #{cdr.caller_aor} to " +
              "#{cdr.callee_aor} at #{cdr.start_time}, call status = " +
              "#{cdr.termination_text}"
            end
            
            save_cdr(cdr)
          end
        else
          log.debug {"resolve_call: no good call legs found, discarding this call"}
        end
      end
    # Don't let a single bad call blow up the whole run, if the exception was
    # raised by our code.  If it's an exception that we don't know about, then
    # bail out.
    rescue CallResolverException
      log.error("resolve_call: error with call ID = \"#{call_id}\", no CDR created: #{$!}") 
    end
  end

  # Load all events in the time window from HA distributed servers.
  # Return a hash where the key is a call ID and values
  # are event arrays for that call ID, sorted by time.
  def load_distrib_events_in_time_window(start_time, end_time)
    call_map = {}
    
    # all_calls holds arrays, one array of event subarrays for each database.
    # Each subarray holds events for one call, sorted by time.
    all_calls = []
    
    cse_database_urls.each do |db_url|
      connect_to_cse_database(db_url)
      
      # Load events from this database
      events = []
      begin
        events = load_events_in_time_window(start_time, end_time)
        log.debug{"load_distrib_events_in_time_window: loaded #{events.length} " +
                  "events from #{db_url}"}
      rescue
        log.error("load_distrib_events_in_time_window: Unable to load events " +
                  "from the database #{db_url}. The error was: \"#{$!}\".")
      end
      
      # Divide the events into subarrays, one for each call.  Save the result.
      if events.length > 0
        calls = split_events_by_call(events)
        all_calls << calls
      end
    end
    
    # Put the event arrays in the hash table, merging on collisions
    merge_events_for_call(all_calls, call_map)

    call_map
  end
    
  # Put call event arrays in the hash table, merging on collisions. The
  # all_calls arg is an array of arrays, one for each CSE database. Each array
  # contains event arrays, where each event array is for a single call.
  def merge_events_for_call(all_calls, call_map)
    all_calls.each do |calls|
      calls.each do |call|
        call_id = call[0].call_id
        # If there is a hash entry already, then merge this partial call into
        # it, keeping the time sort. Otherwise create a new hash entry.
        # Typically each call comes completely from one server, but that is not
        # always true.
        entry = call_map[call_id]
        if entry
          # merge the events together and re-sort by time
          entry += call
          entry.sort!{|x, y| x.event_time <=> y.event_time}
          
          # not sure why I have to do this -- entry has been modified in place,
          # yes? -- but otherwise the hash entry doesn't get updated correctly
          call_map[call_id] = entry   
        else
          # create a new hash entry
          call_map[call_id] = call
        end
      end
    end
  end

  # Given an events array where the events for a given call are contiguous,
  # split the array into subarrays, one for each call.  Return the array of
  # subarrays.
  def split_events_by_call(events)
    calls = []
    call_start = 0                          # index of first event in the call
    while call_start < events.length
      call_id = events[call_start].call_id  # call ID of current call
      
      # Look for the next event with a different call ID.  If we don't find
      # one, then the current call goes to the end of the events array.
      call_end = events.length - 1
      events[call_start + 1..-1].each_with_index do |event, index|
        if event.call_id != call_id
          call_end = index + call_start
          break
        end
      end
  
      # Add the subarray to the calls array
      calls << events[call_start..call_end]
  
      # On to the next call
      call_start = call_end + 1
    end
    calls
  end

  # Load all events in the time window, sorted by call ID then by time.
  # Return the events in an array.
  def load_events_in_time_window(start_time, end_time)
    events =
      CallStateEvent.find(
        :all,
        :conditions => "event_time >= '#{start_time}' and event_time < '#{end_time}'",
        :order => "call_id, event_time")
    log.debug{"load_events: loaded #{events.length} events between #{start_time} and #{end_time}"}
    events
  end

  # Finds the last generated CDR (if there is one) and returns the 
  # start time. If there is no CDR the return now - 24 hrs.
  def get_daily_start_time
    event = Cdr.find(
                     :first, 
                     :order => "start_time DESC")
    if event == nil
      start_time = Time.now() - SECONDS_IN_A_DAY
    else
      start_time = event.start_time 
    end
    start_time
  end
   
  # Load all events with the given call_id, in ascending chronological order.
  def load_events_with_call_id(call_id)
    events =
      CallStateEvent.find(
        :all,
        :conditions => "call_id = '#{call_id}'",
        :order => "event_time")
    log.debug {"load_events: loaded #{events.length} events with call_id = #{call_id}"}
    events
  end
  
  # Load the call IDs for the specified time window.
  def load_call_ids(start_time, end_time)
    # This query returns an array of objects of class CallStateEvent, but they
    # only have the call_id attribute defined.  Convert to an array of strings
    # and return that.
    results = CallStateEvent.find_by_sql(
      "select distinct call_id from call_state_events " +
      "where event_time >= '#{start_time}' and event_time <= '#{end_time}'")
    results.collect do |obj|
      obj.call_id
    end
  end  
  
  # Given an events array sorted by time, find the call request event, which
  # represents a SIP INVITE message. Return nil if there are no call requests.
  # If there are multiple requests, then pick original INVITEs (no to_tag).
  # If there's more than one of those, then pick the earliest.
  # Note that in the HA case events may be logged on multiple machines, with
  # imperfectly synchronized clocks. 
  def find_call_request(events)
    call_request = nil        # return value
    m = 'find_call_request'   # method name, for debug logging
    
    # Find call requests
    requests = events.find_all {|event| event.call_request?}
    if requests.length == 1
      call_request = requests[0]
      log.debug {"#{m}: found one call request: #{call_request.to_s}"}
    elsif requests.length == 0
      log.debug {"#{m}: found no call requests"}
    else                  # requests.length > 1
      log.debug {
        "#{m}: found #{requests.length} call requests: " +
        Utils.events_to_s(requests)}
      
      # Among the call requests, find original INVITEs
      originals = requests.find_all {|event| !event.to_tag}
      if originals.length == 1
        call_request = originals[0]
        log.debug{"#{m}: found one original call request: #{call_request.to_s}"}
      elsif originals.length == 0
        call_request = requests[0]
        log.debug{"#{m}: found no original call requests, " +
                  "use the earliest non-original request: #{call_request.to_s}"}
      else                # originals.length > 1
        call_request = originals[0]
        log.debug {
          "#{m}: found #{originals.length} original call requests: " +
          Utils.events_to_s(originals) +
          "\nuse the earliest: #{call_request.to_s}"}
      end
    end
    
    call_request
  end

  # Read info from the call request event and start constructing the CDR.
  # Return the new CDR.
  def start_cdr(call_req)
    cdr = Cdr.new(:call_id => call_req.call_id,
                  :from_tag => call_req.from_tag,
                  :caller_aor => call_req.caller_aor,
                  :callee_aor => call_req.callee_aor,
                  :start_time => call_req.event_time,
                  :termination => Cdr::CALL_REQUESTED_TERM)
    cdr.caller_contact = Utils.contact_without_params(call_req.contact)

    cdr
  end

  # Pick the call leg with the best outcome and longest duration to be the
  # basis for the CDR.  Return the to_tag for that call leg.  A call leg is a
  # set of events with a common to_tag.
  # Return nil if there is no such call leg.
  def best_call_leg(events)     # array of events with a given call ID
    to_tag = nil                # result: the to_tag for the best call leg
    
    # Is there at least one call_end event?
    has_call_end = !events.any? {|event| event.call_end?}
    
    # Find the call leg with the best outcome and longest duration. If there is
    # a call_end event, then look for the call end event with the biggest
    # timestamp. Otherwise look for the call failure event with the biggest
    # timestamp. Events have already been sorted for us in timestamp order.
    final_event_type = has_call_end ?
                       CallStateEvent::CALL_FAILURE_TYPE :
                       CallStateEvent::CALL_END_TYPE
    events.reverse_each do |event|
      if event.event_type == final_event_type
        to_tag = event.to_tag
        log.debug {"best_call_leg: to_tag is #{to_tag} for final event #{event.to_s}"}
        break
      end
    end
    
    if !to_tag
      # If there is no final event, then try to at least find a call_setup event
      events.reverse_each do |event|
        if event.call_setup?
          to_tag = event.to_tag
          log.debug {"best_call_leg: to_tag is #{to_tag} for setup event #{event.to_s}"}
          break
        end
      end
    end

    if !to_tag
      log.debug {"best_call_leg: could not find a final event or a setup event"}
    end

    to_tag
  end
  
  # Fill in the CDR from a call leg consisting of the events with the given
  # to_tag.  Return true if successful, false otherwise.
  def finish_cdr(cdr, events, to_tag)
    status = false                # return value: did we fill in the CDR?

    # Get the events for the call leg
    call_leg = events.find_all {|event| event.to_tag == to_tag or
                                        event.from_tag == to_tag}

    # Find the call_setup event
    call_setup = call_leg.find {|event| event.call_setup?}

    if call_setup
      # The call was set up, so mark it provisionally as in progress.
      cdr.termination = Cdr::CALL_IN_PROGRESS_TERM
 
      # We have enough data now to build the CDR.
      status = true
      
      # Get data from the call_setup event
      cdr.callee_contact = Utils.contact_without_params(call_setup.contact)
      cdr.to_tag = call_setup.to_tag
      cdr.connect_time = call_setup.event_time
      
      # Get data from the call_end or call_failure event
      call_end = call_leg.find {|event| event.call_end?}
      if call_end
        cdr.termination = Cdr::CALL_COMPLETED_TERM    # successful completion
        cdr.end_time = call_end.event_time
      else
        # Couldn't find a call_end event, try for call_failure
        status = handle_call_failure(call_leg, cdr)
      end
    else
      # No call_setup event, so look for a call_failure event
      status = handle_call_failure(call_leg, cdr)
    end
    
    status
  end

  # Look for a call_failure event in the call_leg events array.  If we find one,
  # then fill in CDR info and return success.  Otherwise return failure.
  def handle_call_failure(call_leg, cdr)
    status = false      
    call_failure = call_leg.find {|event| event.call_failure?}
    if call_failure
      # found a call_failure event, use it
      # :TODO: consider optimizing space usage by not setting the
      # failure_reason if it is the default value for the failure_status
      # code. For example, the 486 error has the default reason "Busy Here".      
      cdr.to_tag = call_failure.to_tag  # may already be filled in from call_setup
      cdr.termination = Cdr::CALL_FAILED_TERM
      cdr.end_time = call_failure.event_time
      cdr.failure_status = call_failure.failure_status
      cdr.failure_reason = call_failure.failure_reason
      status = true
    end
    status
  end

  # Save the CDR.  Be sure not to create a duplicate CDR for the same dialog.
  # (:TODO: is there a way to avoid querying for the existence of a prior CDR
  # here?  Or maybe we could query just by call_id first and index that column?)
  # Raise a CallResolverException if the save fails for some reason.
  #
  # Assume that this is the only process/thread creating CDRs for these calls.
  # Therefore we don't have to worry about the race condition where after we
  # check for the existence of the CDR and before we save it, another thread
  # saves a CDR with the same call ID.
  def save_cdr(cdr)
    # Continue only if a complete CDR doesn't already exist
    db_cdr = find_cdr(cdr)
    
    # If we found an existing CDR, then log that for debugging
    if db_cdr
      log.debug do
        "save_cdr: found an existing CDR.  Call ID = #{db_cdr.call_id}, " +
        "termination = #{db_cdr.termination}, ID = #{db_cdr.id}."
      end
    end
    
    # Save the CDR as long as there is no existing, complete CDR for that call ID.
    if (!db_cdr or !db_cdr.complete?)
      # Call the Observable method indicating a state change.
      changed
  
      # Notify plugins of the new CDR.
      # "notify_observers" is a method of the Observable module, which is mixed
      # in to the CallResolver.
      notify_observers(EVENT_NEW_CDR,       # event type
                       cdr)                 # the new CDR
      
      # Save the CDR. If there is an incomplete CDR already, then replace it.
      # Call "save!" rather than "save" so that we'll get an exception if the
      # save fails.
      # :TODO: Perhaps just copy values into the CDR in the DB and update it,
      # rather than doing a delete/insert.
      if db_cdr
        db_cdr.destroy
      end
      if cdr.save!
        db_cdr = cdr
      else
        Utils.raise_exception('save_helper_cdr: save failed')
      end
    end
    
    db_cdr
  end
  
  # Given an in-memory CDR, find that CDR in the database and return it.
  # If the CDR is not in the database, then return nil.
  def find_cdr(cdr)
    if !cdr.call_id
      Utils.raise_exception('find_cdr: call_id is nil', ArgumentError)
    end
    
    Cdr.find(
      :first,
      :conditions => "call_id = '#{cdr.call_id}'")
  end

  # Purge records from call state event and cdr tables, making sure
  # to delete unreferenced entries in the parties table.
  def purge(start_time_cse, start_time_cdr)
    connect_to_cdr_database
    
    # Purge CSEs
    CallStateEvent.transaction do
      log.debug{"purge: purging CSEs where event_time <= #{start_time_cse}"}
      CallStateEvent.delete_all(["event_time <= '#{start_time_cse}'"])
    end
    
    # Purge CDRs
    Cdr.transaction do
      log.debug{"purge: purging CDRs where event_time <= #{start_time_cdr}"}
      Cdr.delete_all(["end_time <= '#{start_time_cdr}'"])
    end    
  
    # Garbage-collect deleted records and tune performance.
    # See http://www.postgresql.org/docs/8.0/static/sql-vacuum.html .
    # :TODO: For HA, purge multiple databases, both CSE and CDR
    # Must be done outside of a transaction or we'll get an error.
    ActiveRecord::Base.connection.execute(POSTGRESQL_TUNE_UP_DATABASE)
  end

end    # class CallResolver


syntax highlighted by Code2HTML, v. 0.9.1