API example of user presence monitoring

API example of user presence monitoring

You are here:
Estimated reading time: 4 min.

This is a reference client implementation written in Python. Here, event polling is used to monitor the phone and user status events (to display presence information).

user_monitor.py

#!/usr/bin/python3

"""
Sample python script showing how to monitor PBX user presence using the PBX
API.
"""

import sys
import urllib.request, urllib.parse, urllib.error
import urllib.request, urllib.error, urllib.parse
import logging
import argparse
import ssl

from getpass import getpass
from base64 import b64encode

try:
    import json
except ImportError:
    import simplejson as json

MAX_RETRIES = 3
SSL_CONTEXT = None

def disable_certificate_verification():
    global SSL_CONTEXT
    if hasattr(ssl, "create_default_context"):
        SSL_CONTEXT = ssl.create_default_context()
        SSL_CONTEXT.check_hostname = False
        SSL_CONTEXT.verify_mode = ssl.CERT_NONE

def api_call(call_url, auth, options=None):
    """Make an API call, return decoded JSON response.

    Uses HTTP authentication and retries on error.

    See api_call_simple() for simplified version.
    """
    if options:
        call_url += "?" + urllib.parse.urlencode(options)

    if auth:
        auth = [a.encode("utf-8") for a in auth]
        auth_hdr = "Basic " + b64encode(b":".join(auth)).decode("us-ascii")

    logging.debug("calling: {0!r}".format(call_url))

    attempt = 0
    while True:
        attempt += 1
        try:
            req = urllib.request.Request(call_url)
            req.add_header('Accept', 'application/json')
            req.add_header('Authorization', auth_hdr)
            response = urllib.request.urlopen(req, context=SSL_CONTEXT)
            result = json.loads(response.read().decode("utf-8"))
            break
        except IOError as err:
            print("I/O error for %s: %s" % (call_url, err))
            if attempt == MAX_RETRIES:
                raise
            else:
                continue
        except ValueError as err:
            print("Invalid response from %s: %s" % (call_url, err))
            if attempt == MAX_RETRIES:
                raise
            else:
                continue
    logging.debug("got: {0!r}".format(result))
    return result

def api_call_simple(call_url, api_key, options=None):
    """The minimal implementation of an API call."""
    if not options:
        options = {}
    options['key'] = api_key
    call_url += "?" + urllib.parse.urlencode(options)
    return json.load(urllib.request.urlopen(call_url, context=SSL_CONTEXT))

def load_users(api_url, auth):
    users = api_call(api_url + "users", auth, {"status": "1"})
    return dict([(user['uuid'], user) for user in users])

def load_phones(api_url, auth):
    phones = api_call(api_url + "phones", auth, {"status": "1"})
    return dict([(phone['uuid'], phone) for phone in phones])

def load_events(api_url, auth, last=None, limit=None, timeout=None):
    options = {}
    if last is not None:
        options["last"] = last
    if limit is not None:
        options["limit"] = limit
    if timeout is not None:
        options["timeout"] = timeout
    events = api_call(api_url + "events", auth, options)
    return events

def get_user_name(user):
    username = user["username"]
    full_name = user["full_name"]
    if full_name:
        name = "%s (%s)" % (username, full_name)
    else:
        name = username
    return name

def dump_user(user, phones):
    name = get_user_name(user)
    phone_uuid = user['phone']
    if not phone_uuid or phone_uuid not in phones:
        print("%s: no phone" % (name,))
        return
    phone = phones[phone_uuid]
    print("%s: %s" % (name, phone['status']))

def dump_users(users, phones):
    users = list(users.values())
    users.sort(key = lambda u: u["username"])
    for user in users:
        dump_user(user, phones)

def dump_user_verbose(user, phones):
    name = get_user_name(user)
    print("%s (%s):" % (name, user.get("uuid")))
    print("    username: ", user.get("username"))
    print("    full name:", user.get("full_name"))
    if "phones" not in user:
        # PBX 3.x
        p_phone = user.get("primary_phone")
        if p_phone:
            u_phones = [{
                        "type": "phone",
                        "phone": p_phone
                        }]
        else:
            u_phones = []
    else:
        # PBX 4.x
        u_phones = user["phones"]
    for u_phone in u_phones:
        u_phones = user.get("phones")
        p_type = u_phone["type"]
        phone_uuid = u_phone.get("phone")
        if not phone_uuid or phone_uuid not in phones:
            phone = None
        else:
            phone = phones[phone_uuid]
        if p_type == "phone":
            if not phone_uuid or phone_uuid not in phones:
                continue
            print("        phone: %s: %s" % (phone['phone_username'],
                                            phone['status']))
        elif p_type == "hot-desk":
            print("        hot-desk")
        elif p_type == "number":
            print("        number: %s" % (u_phone.get("number"),))
        elif p_type == "trunk number":
            print("        number: %s on trunk: %s"
                             % (u_phone.get("number"), u_phone.get("trunk")))
        else:
            print("        unrecognized")
    if not u_phones:
        print("        none")
    if "hot_desk" in user:
        hot_desk = user["hot_desk"]
    else:
        hot_desk = user.get("current_phone")
        if hot_desk == user.get("primary_phone"):
            hot_desk = None
    if hot_desk:
        phone = phones.get(hot_desk)
        if phone:
            print("    logged in on phone: %s (%s)"
                    % (phone["phone_username"], phone["status"]))

def dump_users_verbose(users, phones):
    users = list(users.values())
    users.sort(key = lambda u: u["username"])
    for user in users:
        dump_user_verbose(user, phones)


def dump_target(target, users, phones):
    user_uuid = target.get("user_uuid")
    phone_uuid = target.get("phone_uuid")
    trunk_uuid = target.get("trunk_uuid")
    number = target.get("number")
    if user_uuid:
        user = users.get(user_uuid)
        user_phone_type = target.get("user_phone_type")
        if user:
            print("    user:", get_user_name(user))
        else:
            print("    unknown user:", user_uuid)
        if user_phone_type == "phone":
            phone = phones.get(phone_uuid)
            print("        phone:", phone["phone_username"]
                                            if phone else phone_uuid)
        elif user_phone_type == "hot-desk":
            phone = phones.get(phone_uuid)
            print("        hot-desk phone:", phone["phone_username"]
                                            if phone else phone_uuid)
        elif user_phone_type == "number":
            print("        number:", number)
        elif user_phone_type == "trunk number":
            print("        number:", number, "on trunk:", trunk_uuid)
        elif user_phone_type == "forward":
            print("        forwarded to:", number)
    elif phone_uuid:
        phone = phones.get(phone_uuid)
        if phone:
            print("    phone:", phone["phone_username"])
        else:
            print("    unknown phone:", phone_uuid)
    elif number:
        print("    number:", target["number"])
    else:
        print("    unknown:", json.dumps(target))

def dump_call(event, users, phones):
    print(" call", event["unique_id"])
    if "event_tag" in event:
        print("  event tag:", event["event_tag"])
    caller = event["caller"]
    if "user_uuid" in caller:
        user_uuid = caller["user_uuid"]
        user = users.get(user_uuid)
        if user:
            print("  from user:", get_user_name(user))
        else:
            print("  from unknown user:", user_uuid)
    if "phone_uuid" in caller:
        phone_uuid = caller["phone_uuid"]
        phone = phones.get(phone_uuid)
        if phone:
            if phone.get("name"):
                print("  from phone:", phone["name"])
            else:
                print("  from unnamed phone:", phone_uuid)
        else:
            print("  from unknown phone:", phone_uuid)
    if "normalized_number" in caller:
        print("  from number (normalized):", caller["normalized_number"])
    elif "number" in caller:
        print("  from number:", caller["number"])
    if "name" in caller:
        print("  from name:", caller["name"])
    if "called_did" in event:
        print("  to DID:", event["called_did"])
    if "called_extension" in event:
        print("  to extension:", event["called_extension"])
    if "called_number" in event:
        print("  to number:", event["called_number"])
    targets = event.get("targets")
    if targets is None:
        target = event.get("target")
        if target:
            print("  Target:")
            dump_target(target, users, phones)
    else:
        for i, target in enumerate(targets, 1):
            print("  Target #%i:" % (i,))
            dump_target(target, users, phones)

def main():
    parser = argparse.ArgumentParser("Axeos PBX API example")
    parser.add_argument("--tls", "--ssl", action="store_true",
                        help="use TLS")
    parser.add_argument("--disable-certificate-verification", action="store_true",
                        help="disable certificate verification")
    parser.add_argument("--debug", action="store_true",
                        help="Display debug output")
    parser.add_argument("--user",
                        help="PBX user name")
    parser.add_argument("--password",
                        help="PBX user GUI password")
    parser.add_argument("--api-key",
                        help="PBX API key")
    parser.add_argument("pbx_address",
                        help="IP address or host name of a PBX")
    options = parser.parse_args()

    if options.disable_certificate_verification:
        disable_certificate_verification()

    if options.api_key:
        if options.user or options.password:
            parser.error("--api-key cannot be used with --user or --password")
        auth = ('apiKey', options.api_key)
    else:
        if not options.user:
            options.user = input("Username? > ")
        if not options.password:
            options.password = getpass("Password? > ")
        auth = (options.user, options.password)

    if options.tls:
        api_url = 'https://{0}/apis/pbx/'.format(options.pbx_address)
    else:
        api_url = 'http://{0}/apis/pbx/'.format(options.pbx_address)

    if options.debug:
        logging.basicConfig(level=logging.DEBUG)
    else:
        logging.basicConfig(level=logging.INFO)

    # flush old events
    events = load_events(api_url, auth, limit=1, timeout=0)
    if events:
        last_event = events[0]['seq']
    else:
        last_event = None

    # load user and phone database with current state
    users = load_users(api_url, auth)
    phones = load_phones(api_url, auth)

    dump_users_verbose(users, phones)
    print()

    while True:
        events = load_events(api_url, auth, last=last_event)
        if events:
            last_event = events[-1]['seq']
        for event in events:
            event_type = event['event_type']
            if event_type == 'user list changed':
                print('User list changed')
                users = load_users(api_url, auth)
                dump_users(users, phones)
            elif event_type == 'phone list changed':
                print('Phone list changed')
                phones = load_phones(api_url, auth)
                dump_users(users, phones)
            elif event_type == 'phone status update':
                phone_uuid = event['phone_uuid']
                if phone_uuid not in phones:
                    print('status change for unknown phone:', phone_uuid)
                else:
                    phone = phones[phone_uuid]
                    phone['status'] = event['status']
                    phone['status_code'] = event['status_code']
                    for user in list(users.values()):
                        if user['phone'] == phone_uuid:
                            dump_user(user, phones)
            elif event_type == 'user logged in':
                user_uuid = event['user_uuid']
                if user_uuid not in users:
                    print('unknown user logged in:', user_uuid)
                else:
                    user = users[user_uuid]
                    user['phone'] = event['phone_uuid']
                    dump_user(user, phone)
            elif event_type == 'user logged out':
                user_uuid = event['user_uuid']
                if user_uuid not in users:
                    print('unknown user logged out:', user_uuid)
                else:
                    user = users[user_uuid]
                    user['phone'] = user['primary_phone']
                    dump_user(user, phones)
            elif event_type == 'incoming call':
                print('incoming call:')
                dump_call(event, users, phones)
            elif event_type == 'api call':
                print('api call:')
                dump_call(event, users, phones)
            elif event_type == 'internal dial':
                print('internal dial:')
                dump_call(event, users, phones)
            elif event_type == 'call answered':
                print('call answered:')
                dump_call(event, users, phones)
            elif event_type == 'call finished':
                print('call finished (%s):' % (event["reason"],))
                dump_call(event, users, phones)
            else:
                print('unknown event:', event_type)
            print()

if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        print()
    except IOError:
        print("Aborting!")


Was this article helpful?
No