# -*- Mode: Python; -*-
# Package : omniORBpy
# any.py Created on: 2002/09/16
# Author : Duncan Grisby (dgrisby)
#
# Copyright (C) 2002 Apasphere Ltd
#
# This file is part of the omniORBpy library
#
# The omniORBpy library is free software; you can redistribute it
# and/or modify it under the terms of the GNU Lesser General
# Public License as published by the Free Software Foundation;
# either version 2.1 of the License, or (at your option) any later
# version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free
# Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
# MA 02111-1307, USA
#
#
# Description:
# Utility functions for working with Anys.
"""
omniORB.any module -- Any support functions.
to_any(data) -- try to coerce data to an Any.
from_any(any, keep_structs=0) -- return any's contents as plain Python
objects. If keep_structs is true,
CORBA structs are kept as Python class
instances; if false, they are expanded
to dictionaries.
"""
from types import *
import omniORB
import CORBA, tcInternal
import random
__all__ = ["to_any", "from_any"]
# Counter for generating repoIds. Ideally, should not clash with other
# IDs, but this will do for now...
random.seed()
_idbase = "%08x" % random.randrange(0, 0x7fffffff)
_idcount = 0
# Fudge things for Pythons without unicode / bool
try:
UnicodeType
except NameError:
class UnicodeType:
pass
try:
BooleanType
except NameError:
class BooleanType:
pass
try:
bool(1)
except:
def bool(x): return x
# Fixed type
_f = CORBA.fixed(0)
FixedType = type(_f)
def to_any(data):
"""to_any(data) -- try to return data as a CORBA.Any"""
tc, val = _to_tc_value(data)
return CORBA.Any(tc, val)
def _to_tc_value(data):
"""_to_tc_value(data) -- return TypeCode and value for Any insertion"""
if data is None:
return CORBA.TC_null, None
elif isinstance(data, StringType):
return CORBA.TC_string, data
elif isinstance(data, UnicodeType):
return CORBA.TC_wstring, data
elif isinstance(data, BooleanType):
return CORBA.TC_boolean, data
elif isinstance(data, IntType) or isinstance(data, LongType):
if -2147483648L <= data <= 2147483647L:
return CORBA.TC_long, int(data)
elif 0 <= data <= 4294967295L:
return CORBA.TC_ulong, data
elif -9223372036854775808L <= data <= 9223372036854775807L:
return CORBA.TC_longlong, data
elif 0 <= data <= 18446744073709551615L:
return CORBA.TC_ulonglong, data
else:
raise CORBA.BAD_PARAM(omniORB.BAD_PARAM_PythonValueOutOfRange,
CORBA.COMPLETED_NO)
elif isinstance(data, FloatType):
return CORBA.TC_double, data
elif isinstance(data, ListType):
if data == []:
tc = tcInternal.createTypeCode((tcInternal.tv_sequence,
tcInternal.tv_any, 0))
return tc, data
d0 = data[0]
if isinstance(d0, StringType):
for d in data:
if not isinstance(d, StringType):
break
else:
# List of strings
tc = tcInternal.createTypeCode((tcInternal.tv_sequence,
CORBA.TC_string._d, 0))
return tc, data
elif isinstance(d0, UnicodeType):
for d in data:
if not isinstance(d, UnicodeType):
break
else:
# List of wstrings
tc = tcInternal.createTypeCode((tcInternal.tv_sequence,
CORBA.TC_wstring._d, 0))
return tc, data
elif isinstance(d0, BooleanType):
for d in data:
if not isinstance(d, BooleanType):
break
else:
tc = tcInternal.createTypeCode((tcInternal.tv_sequence,
tcInternal.tv_boolean, 0))
return tc, data
elif isinstance(d0, IntType) or isinstance(d0, LongType):
# Numeric. Try to find a numeric type suitable for the whole list
min_v = max_v = 0
for d in data:
if not (isinstance(d, IntType) or isinstance(d, LongType)):
break
if d < min_v: min_v = d
if d > max_v: max_v = d
else:
if min_v >= -2147483648L and max_v <= 2147483647L:
tc = tcInternal.createTypeCode((tcInternal.tv_sequence,
tcInternal.tv_long, 0))
return tc, map(int, data)
elif min_v >= 0 and max_v <= 4294967295L:
tc = tcInternal.createTypeCode((tcInternal.tv_sequence,
tcInternal.tv_ulong, 0))
return tc, map(long, data)
elif (min_v >= -9223372036854775808L and
max_v <= 9223372036854775807L):
tc = tcInternal.createTypeCode((tcInternal.tv_sequence,
tcInternal.tv_longlong, 0))
return tc, map(long, data)
elif min_v >= 0 and max_v <= 18446744073709551615L:
tc = tcInternal.createTypeCode((tcInternal.tv_sequence,
tcInternal.tv_ulonglong,0))
return tc, map(long, data)
else:
raise CORBA.BAD_PARAM(
omniORB.BAD_PARAM_PythonValueOutOfRange,
CORBA.COMPLETED_NO)
elif isinstance(d0, FloatType):
for d in data:
if not isinstance(d, FloatType):
break
else:
# List of doubles
tc = tcInternal.createTypeCode((tcInternal.tv_sequence,
tcInternal.tv_double, 0))
return tc, data
elif isinstance(d0, CORBA.Any):
for d in data:
if not isinstance(d, CORBA.Any):
break
else:
# List of anys
tc = tcInternal.createTypeCode((tcInternal.tv_sequence,
tcInternal.tv_any, 0))
return tc, data
# Generic list
tc = tcInternal.createTypeCode((tcInternal.tv_sequence,
tcInternal.tv_any, 0))
any_list = map(to_any, data)
atc = any_list[0]._t
for a in any_list:
if not a._t.equivalent(atc):
break
else:
tc = tcInternal.createTypeCode((tcInternal.tv_sequence, atc._d, 0))
for i in range(len(any_list)):
any_list[i] = any_list[i]._v
return tc, any_list
elif isinstance(data, TupleType):
return _to_tc_value(list(data))
elif isinstance(data, DictType):
# Represent dictionaries as structs
global _idcount
_idcount = _idcount + 1
id = "omni:%s:%08x" % (_idbase, _idcount)
dl = [tcInternal.tv_struct, None, id, ""]
ms = []
svals = []
items = data.items()
for (k,v) in items:
if not isinstance(k, StringType):
raise CORBA.BAD_PARAM(omniORB.BAD_PARAM_WrongPythonType,
CORBA.COMPLETED_NO)
t, v = _to_tc_value(v)
ms.append(k)
dl.append(k)
dl.append(t._d)
svals.append(v)
cls = omniORB.createUnknownStruct(id, ms)
dl[1] = cls
tc = tcInternal.createTypeCode(tuple(dl))
value = apply(cls, svals)
return tc, value
elif isinstance(data, FixedType):
if data == CORBA.fixed(0):
tc = tcInternal.createTypeCode((tcInternal.tv_fixed, 1, 0))
else:
tc = tcInternal.createTypeCode((tcInternal.tv_fixed,
data.precision(), data.decimals()))
return tc, data
elif isinstance(data, CORBA.Any):
return data._t, data._v
elif isinstance(data, omniORB.EnumItem):
return omniORB.findTypeCode(data._parent_id), data
elif hasattr(data, "_NP_RepositoryId"):
return omniORB.findTypeCode(data._NP_RepositoryId), data
raise CORBA.BAD_PARAM(omniORB.BAD_PARAM_WrongPythonType,
CORBA.COMPLETED_NO)
def from_any(any, keep_structs=0):
"""from_any(any, keep_structs=0) -- extract the data from an Any.
If keep_structs is true, CORBA structs are left as Python
instances; if false, structs are turned into dictionaries with
string keys.
"""
if not isinstance(any, CORBA.Any):
raise CORBA.BAD_PARAM(omniORB.BAD_PARAM_WrongPythonType,
CORBA.COMPLETED_NO)
return _from_desc_value(any._t._d, any._v, keep_structs)
def _from_desc_value(desc, value, keep_structs=0):
"""_from_desc_value(desc,val,keep_structs) -- de-Any value"""
if type(desc) is IntType:
if desc == tcInternal.tv_boolean:
return bool(value)
elif desc != tcInternal.tv_any:
# Nothing to do
return value
else:
k = desc
else:
k = desc[0]
while k == tcInternal.tv_alias:
desc = desc[3]
if type(desc) is IntType:
if desc == tcInternal.tv_boolean:
return bool(value)
elif desc != tcInternal.tv_any:
return value
else:
k = desc
else:
k = desc[0]
if k == tcInternal.tv_any:
return _from_desc_value(value._t._d, value._v, keep_structs)
elif k == tcInternal.tv_struct:
if keep_structs:
return value
d = {}
for i in range(4, len(desc), 2):
sm = desc[i]
sd = desc[i+1]
d[sm] = _from_desc_value(sd, getattr(value, sm), keep_structs)
return d
elif k in [tcInternal.tv_sequence, tcInternal.tv_array]:
sd = desc[1]
if type(sd) is IntType and sd != tcInternal.tv_any:
return value
else:
rl = []
for i in value:
rl.append(_from_desc_value(sd, i, keep_structs))
return rl
return value
syntax highlighted by Code2HTML, v. 0.9.1