mirror of
https://github.com/postgres/postgres.git
synced 2025-11-17 00:02:52 -05:00
Add test cases for inline handler of plython2u (when using that language name), and for result object element assignment. There is now at least one test case for every top-level functionality, except plpy.Fatal (annoying to use in regression tests) and result object slice retrieval and slice assignment (which are somewhat broken).
382 lines
9.6 KiB
Plaintext
382 lines
9.6 KiB
Plaintext
--
|
|
-- result objects
|
|
--
|
|
CREATE FUNCTION test_resultobject_access() RETURNS void
|
|
AS $$
|
|
rv = plpy.execute("SELECT fname, lname, username FROM users ORDER BY username")
|
|
plpy.info([row for row in rv])
|
|
rv[1] = dict([(k, v*2) for (k, v) in rv[1].items()])
|
|
plpy.info([row for row in rv])
|
|
$$ LANGUAGE plpythonu;
|
|
SELECT test_resultobject_access();
|
|
INFO: [{'lname': 'doe', 'username': 'j_doe', 'fname': 'jane'}, {'lname': 'doe', 'username': 'johnd', 'fname': 'john'}, {'lname': 'smith', 'username': 'slash', 'fname': 'rick'}, {'lname': 'doe', 'username': 'w_doe', 'fname': 'willem'}]
|
|
CONTEXT: PL/Python function "test_resultobject_access"
|
|
INFO: [{'lname': 'doe', 'username': 'j_doe', 'fname': 'jane'}, {'lname': 'doedoe', 'username': 'johndjohnd', 'fname': 'johnjohn'}, {'lname': 'smith', 'username': 'slash', 'fname': 'rick'}, {'lname': 'doe', 'username': 'w_doe', 'fname': 'willem'}]
|
|
CONTEXT: PL/Python function "test_resultobject_access"
|
|
test_resultobject_access
|
|
--------------------------
|
|
|
|
(1 row)
|
|
|
|
--
|
|
-- nested calls
|
|
--
|
|
CREATE FUNCTION nested_call_one(a text) RETURNS text
|
|
AS
|
|
'q = "SELECT nested_call_two(''%s'')" % a
|
|
r = plpy.execute(q)
|
|
return r[0]'
|
|
LANGUAGE plpythonu ;
|
|
CREATE FUNCTION nested_call_two(a text) RETURNS text
|
|
AS
|
|
'q = "SELECT nested_call_three(''%s'')" % a
|
|
r = plpy.execute(q)
|
|
return r[0]'
|
|
LANGUAGE plpythonu ;
|
|
CREATE FUNCTION nested_call_three(a text) RETURNS text
|
|
AS
|
|
'return a'
|
|
LANGUAGE plpythonu ;
|
|
-- some spi stuff
|
|
CREATE FUNCTION spi_prepared_plan_test_one(a text) RETURNS text
|
|
AS
|
|
'if "myplan" not in SD:
|
|
q = "SELECT count(*) FROM users WHERE lname = $1"
|
|
SD["myplan"] = plpy.prepare(q, [ "text" ])
|
|
try:
|
|
rv = plpy.execute(SD["myplan"], [a])
|
|
return "there are " + str(rv[0]["count"]) + " " + str(a) + "s"
|
|
except Exception, ex:
|
|
plpy.error(str(ex))
|
|
return None
|
|
'
|
|
LANGUAGE plpythonu;
|
|
CREATE FUNCTION spi_prepared_plan_test_nested(a text) RETURNS text
|
|
AS
|
|
'if "myplan" not in SD:
|
|
q = "SELECT spi_prepared_plan_test_one(''%s'') as count" % a
|
|
SD["myplan"] = plpy.prepare(q)
|
|
try:
|
|
rv = plpy.execute(SD["myplan"])
|
|
if len(rv):
|
|
return rv[0]["count"]
|
|
except Exception, ex:
|
|
plpy.error(str(ex))
|
|
return None
|
|
'
|
|
LANGUAGE plpythonu;
|
|
CREATE FUNCTION join_sequences(s sequences) RETURNS text
|
|
AS
|
|
'if not s["multipart"]:
|
|
return s["sequence"]
|
|
q = "SELECT sequence FROM xsequences WHERE pid = ''%s''" % s["pid"]
|
|
rv = plpy.execute(q)
|
|
seq = s["sequence"]
|
|
for r in rv:
|
|
seq = seq + r["sequence"]
|
|
return seq
|
|
'
|
|
LANGUAGE plpythonu;
|
|
-- spi and nested calls
|
|
--
|
|
select nested_call_one('pass this along');
|
|
nested_call_one
|
|
-----------------------------------------------------------------
|
|
{'nested_call_two': "{'nested_call_three': 'pass this along'}"}
|
|
(1 row)
|
|
|
|
select spi_prepared_plan_test_one('doe');
|
|
spi_prepared_plan_test_one
|
|
----------------------------
|
|
there are 3 does
|
|
(1 row)
|
|
|
|
select spi_prepared_plan_test_one('smith');
|
|
spi_prepared_plan_test_one
|
|
----------------------------
|
|
there are 1 smiths
|
|
(1 row)
|
|
|
|
select spi_prepared_plan_test_nested('smith');
|
|
spi_prepared_plan_test_nested
|
|
-------------------------------
|
|
there are 1 smiths
|
|
(1 row)
|
|
|
|
SELECT join_sequences(sequences) FROM sequences;
|
|
join_sequences
|
|
----------------
|
|
ABCDEFGHIJKL
|
|
ABCDEF
|
|
ABCDEF
|
|
ABCDEF
|
|
ABCDEF
|
|
ABCDEF
|
|
(6 rows)
|
|
|
|
SELECT join_sequences(sequences) FROM sequences
|
|
WHERE join_sequences(sequences) ~* '^A';
|
|
join_sequences
|
|
----------------
|
|
ABCDEFGHIJKL
|
|
ABCDEF
|
|
ABCDEF
|
|
ABCDEF
|
|
ABCDEF
|
|
ABCDEF
|
|
(6 rows)
|
|
|
|
SELECT join_sequences(sequences) FROM sequences
|
|
WHERE join_sequences(sequences) ~* '^B';
|
|
join_sequences
|
|
----------------
|
|
(0 rows)
|
|
|
|
--
|
|
-- plan and result objects
|
|
--
|
|
CREATE FUNCTION result_metadata_test(cmd text) RETURNS int
|
|
AS $$
|
|
plan = plpy.prepare(cmd)
|
|
plpy.info(plan.status()) # not really documented or useful
|
|
result = plpy.execute(plan)
|
|
if result.status() > 0:
|
|
plpy.info(result.colnames())
|
|
plpy.info(result.coltypes())
|
|
plpy.info(result.coltypmods())
|
|
return result.nrows()
|
|
else:
|
|
return None
|
|
$$ LANGUAGE plpythonu;
|
|
SELECT result_metadata_test($$SELECT 1 AS foo, '11'::text AS bar UNION SELECT 2, '22'$$);
|
|
INFO: True
|
|
CONTEXT: PL/Python function "result_metadata_test"
|
|
INFO: ['foo', 'bar']
|
|
CONTEXT: PL/Python function "result_metadata_test"
|
|
INFO: [23, 25]
|
|
CONTEXT: PL/Python function "result_metadata_test"
|
|
INFO: [-1, -1]
|
|
CONTEXT: PL/Python function "result_metadata_test"
|
|
result_metadata_test
|
|
----------------------
|
|
2
|
|
(1 row)
|
|
|
|
SELECT result_metadata_test($$CREATE TEMPORARY TABLE foo1 (a int, b text)$$);
|
|
INFO: True
|
|
CONTEXT: PL/Python function "result_metadata_test"
|
|
ERROR: plpy.Error: command did not produce a result set
|
|
CONTEXT: Traceback (most recent call last):
|
|
PL/Python function "result_metadata_test", line 6, in <module>
|
|
plpy.info(result.colnames())
|
|
PL/Python function "result_metadata_test"
|
|
CREATE FUNCTION result_nrows_test(cmd text) RETURNS int
|
|
AS $$
|
|
result = plpy.execute(cmd)
|
|
return result.nrows()
|
|
$$ LANGUAGE plpythonu;
|
|
SELECT result_nrows_test($$SELECT 1$$);
|
|
result_nrows_test
|
|
-------------------
|
|
1
|
|
(1 row)
|
|
|
|
SELECT result_nrows_test($$CREATE TEMPORARY TABLE foo2 (a int, b text)$$);
|
|
result_nrows_test
|
|
-------------------
|
|
0
|
|
(1 row)
|
|
|
|
SELECT result_nrows_test($$INSERT INTO foo2 VALUES (1, 'one'), (2, 'two')$$);
|
|
result_nrows_test
|
|
-------------------
|
|
2
|
|
(1 row)
|
|
|
|
SELECT result_nrows_test($$UPDATE foo2 SET b = '' WHERE a = 2$$);
|
|
result_nrows_test
|
|
-------------------
|
|
1
|
|
(1 row)
|
|
|
|
CREATE FUNCTION result_len_test(cmd text) RETURNS int
|
|
AS $$
|
|
result = plpy.execute(cmd)
|
|
return len(result)
|
|
$$ LANGUAGE plpythonu;
|
|
SELECT result_len_test($$SELECT 1$$);
|
|
result_len_test
|
|
-----------------
|
|
1
|
|
(1 row)
|
|
|
|
SELECT result_len_test($$CREATE TEMPORARY TABLE foo3 (a int, b text)$$);
|
|
result_len_test
|
|
-----------------
|
|
0
|
|
(1 row)
|
|
|
|
SELECT result_len_test($$INSERT INTO foo3 VALUES (1, 'one'), (2, 'two')$$);
|
|
result_len_test
|
|
-----------------
|
|
0
|
|
(1 row)
|
|
|
|
SELECT result_len_test($$UPDATE foo3 SET b= '' WHERE a = 2$$);
|
|
result_len_test
|
|
-----------------
|
|
0
|
|
(1 row)
|
|
|
|
-- cursor objects
|
|
CREATE FUNCTION simple_cursor_test() RETURNS int AS $$
|
|
res = plpy.cursor("select fname, lname from users")
|
|
does = 0
|
|
for row in res:
|
|
if row['lname'] == 'doe':
|
|
does += 1
|
|
return does
|
|
$$ LANGUAGE plpythonu;
|
|
CREATE FUNCTION double_cursor_close() RETURNS int AS $$
|
|
res = plpy.cursor("select fname, lname from users")
|
|
res.close()
|
|
res.close()
|
|
$$ LANGUAGE plpythonu;
|
|
CREATE FUNCTION cursor_fetch() RETURNS int AS $$
|
|
res = plpy.cursor("select fname, lname from users")
|
|
assert len(res.fetch(3)) == 3
|
|
assert len(res.fetch(3)) == 1
|
|
assert len(res.fetch(3)) == 0
|
|
assert len(res.fetch(3)) == 0
|
|
try:
|
|
# use next() or __next__(), the method name changed in
|
|
# http://www.python.org/dev/peps/pep-3114/
|
|
try:
|
|
res.next()
|
|
except AttributeError:
|
|
res.__next__()
|
|
except StopIteration:
|
|
pass
|
|
else:
|
|
assert False, "StopIteration not raised"
|
|
$$ LANGUAGE plpythonu;
|
|
CREATE FUNCTION cursor_mix_next_and_fetch() RETURNS int AS $$
|
|
res = plpy.cursor("select fname, lname from users order by fname")
|
|
assert len(res.fetch(2)) == 2
|
|
|
|
item = None
|
|
try:
|
|
item = res.next()
|
|
except AttributeError:
|
|
item = res.__next__()
|
|
assert item['fname'] == 'rick'
|
|
|
|
assert len(res.fetch(2)) == 1
|
|
$$ LANGUAGE plpythonu;
|
|
CREATE FUNCTION fetch_after_close() RETURNS int AS $$
|
|
res = plpy.cursor("select fname, lname from users")
|
|
res.close()
|
|
try:
|
|
res.fetch(1)
|
|
except ValueError:
|
|
pass
|
|
else:
|
|
assert False, "ValueError not raised"
|
|
$$ LANGUAGE plpythonu;
|
|
CREATE FUNCTION next_after_close() RETURNS int AS $$
|
|
res = plpy.cursor("select fname, lname from users")
|
|
res.close()
|
|
try:
|
|
try:
|
|
res.next()
|
|
except AttributeError:
|
|
res.__next__()
|
|
except ValueError:
|
|
pass
|
|
else:
|
|
assert False, "ValueError not raised"
|
|
$$ LANGUAGE plpythonu;
|
|
CREATE FUNCTION cursor_fetch_next_empty() RETURNS int AS $$
|
|
res = plpy.cursor("select fname, lname from users where false")
|
|
assert len(res.fetch(1)) == 0
|
|
try:
|
|
try:
|
|
res.next()
|
|
except AttributeError:
|
|
res.__next__()
|
|
except StopIteration:
|
|
pass
|
|
else:
|
|
assert False, "StopIteration not raised"
|
|
$$ LANGUAGE plpythonu;
|
|
CREATE FUNCTION cursor_plan() RETURNS SETOF text AS $$
|
|
plan = plpy.prepare(
|
|
"select fname, lname from users where fname like $1 || '%' order by fname",
|
|
["text"])
|
|
for row in plpy.cursor(plan, ["w"]):
|
|
yield row['fname']
|
|
for row in plpy.cursor(plan, ["j"]):
|
|
yield row['fname']
|
|
$$ LANGUAGE plpythonu;
|
|
CREATE FUNCTION cursor_plan_wrong_args() RETURNS SETOF text AS $$
|
|
plan = plpy.prepare("select fname, lname from users where fname like $1 || '%'",
|
|
["text"])
|
|
c = plpy.cursor(plan, ["a", "b"])
|
|
$$ LANGUAGE plpythonu;
|
|
SELECT simple_cursor_test();
|
|
simple_cursor_test
|
|
--------------------
|
|
3
|
|
(1 row)
|
|
|
|
SELECT double_cursor_close();
|
|
double_cursor_close
|
|
---------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT cursor_fetch();
|
|
cursor_fetch
|
|
--------------
|
|
|
|
(1 row)
|
|
|
|
SELECT cursor_mix_next_and_fetch();
|
|
cursor_mix_next_and_fetch
|
|
---------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT fetch_after_close();
|
|
fetch_after_close
|
|
-------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT next_after_close();
|
|
next_after_close
|
|
------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT cursor_fetch_next_empty();
|
|
cursor_fetch_next_empty
|
|
-------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT cursor_plan();
|
|
cursor_plan
|
|
-------------
|
|
willem
|
|
jane
|
|
john
|
|
(3 rows)
|
|
|
|
SELECT cursor_plan_wrong_args();
|
|
ERROR: TypeError: Expected sequence of 1 argument, got 2: ['a', 'b']
|
|
CONTEXT: Traceback (most recent call last):
|
|
PL/Python function "cursor_plan_wrong_args", line 4, in <module>
|
|
c = plpy.cursor(plan, ["a", "b"])
|
|
PL/Python function "cursor_plan_wrong_args"
|