1 | import os |
---|
2 | from shutil import copyfile |
---|
3 | from unittest.mock import patch, Mock |
---|
4 | |
---|
5 | import pytest |
---|
6 | |
---|
7 | from ow import migrate |
---|
8 | |
---|
9 | |
---|
10 | class TestMigrateUnit(object): |
---|
11 | def test_includeme(self): |
---|
12 | config = Mock() |
---|
13 | migrate.includeme(config) |
---|
14 | config.scan.assert_called_with('ow.migrate') |
---|
15 | |
---|
16 | @patch('ow.migrate.run_migrations') |
---|
17 | def test_closer_wrapper_ok(self, run): |
---|
18 | closer = Mock() |
---|
19 | env = dict( |
---|
20 | registry=Mock(settings={}), |
---|
21 | root_factory=Mock(__module__='mytest'), |
---|
22 | request=1, |
---|
23 | root=2, |
---|
24 | closer=closer) |
---|
25 | migrate.closer_wrapper(env) |
---|
26 | run.assert_called_with(1, 2, 'mytest.migrations') |
---|
27 | assert closer.called |
---|
28 | |
---|
29 | @patch('ow.migrate.closer_wrapper') |
---|
30 | @patch('ow.migrate.prepare') |
---|
31 | def test_application_created_ok(self, prepare, wrap): |
---|
32 | event = Mock() |
---|
33 | migrate.application_created(event) |
---|
34 | assert prepare.called |
---|
35 | assert wrap.called |
---|
36 | |
---|
37 | @patch('ow.migrate.output') |
---|
38 | def test_command_line_no_conf(self, pr): |
---|
39 | ret = migrate.command_line_main(['test.py']) |
---|
40 | assert ret == 1 |
---|
41 | assert pr.called |
---|
42 | |
---|
43 | @patch('ow.migrate.closer_wrapper') |
---|
44 | @patch('ow.migrate.bootstrap') |
---|
45 | def test_command_line_no(self, bs, wrap): |
---|
46 | ret = migrate.command_line_main(['test.py', 'dev.ini']) |
---|
47 | assert ret == 0 |
---|
48 | assert bs.called |
---|
49 | assert wrap.called |
---|
50 | |
---|
51 | @patch('ow.migrate.commit') |
---|
52 | @patch('ow.migrate.get_connection') |
---|
53 | def test_reset_version(self, pget_connection, pcommit): |
---|
54 | zodb = {} |
---|
55 | pget_connection.return_value.root.return_value = zodb |
---|
56 | migrate.reset_version('myrequest', 25) |
---|
57 | assert zodb == {'database_version': 25} |
---|
58 | pcommit.asert_called_with() |
---|
59 | |
---|
60 | |
---|
61 | def cleanup(): |
---|
62 | """ |
---|
63 | Clean up pyc files generated while running the following test suite |
---|
64 | """ |
---|
65 | migrations = os.path.join(os.path.dirname(__file__), 'migrations') |
---|
66 | for f in os.listdir(migrations): |
---|
67 | if '.pyc' in f or 'fail' in f or f == '3.py': |
---|
68 | os.remove(os.path.join(migrations, f)) |
---|
69 | |
---|
70 | |
---|
71 | class mocked_get_connection(object): |
---|
72 | """ |
---|
73 | This is a class we can use to mock pyramid_zodbconn.get_connection() |
---|
74 | (see test_run_migrations) |
---|
75 | """ |
---|
76 | def __init__(self, versions=0): |
---|
77 | self.versions = versions |
---|
78 | |
---|
79 | def root(self): |
---|
80 | return {'database_version': self.versions} |
---|
81 | |
---|
82 | |
---|
83 | class TestsMigrate(object): |
---|
84 | |
---|
85 | package_name = 'ow.tests.migrations' |
---|
86 | ini_path = os.path.join(os.path.dirname(__file__), 'migrations') |
---|
87 | |
---|
88 | def test_get_indexes_ok(self): |
---|
89 | indexes = migrate.get_indexes(self.package_name) |
---|
90 | assert isinstance(indexes, list) |
---|
91 | assert len(indexes) == 2 |
---|
92 | |
---|
93 | def test_get_indexes_fail(self): |
---|
94 | migrate.get_indexes(self.package_name) |
---|
95 | with pytest.raises(ImportError): |
---|
96 | migrate.get_indexes('nonexistent.module.migrations') |
---|
97 | |
---|
98 | def test_get_indexes_invalid(self): |
---|
99 | # Create a new migration file with an invalid name, so the get_indexes |
---|
100 | # will raise a ValueError exception |
---|
101 | copyfile(os.path.join(os.path.dirname(__file__), 'migrations/1.py'), |
---|
102 | os.path.join(os.path.dirname(__file__), 'migrations/fail.py')) |
---|
103 | indexes = migrate.get_indexes(self.package_name) |
---|
104 | assert isinstance(indexes, list) |
---|
105 | assert len(indexes) == 2 |
---|
106 | |
---|
107 | def test_get_max_in_max_cache(self): |
---|
108 | with patch.dict(migrate.MAX_CACHE, {self.package_name: 10}): |
---|
109 | max_version = migrate.get_max(self.package_name) |
---|
110 | assert max_version == 10 |
---|
111 | |
---|
112 | def test_get_max(self): |
---|
113 | max_version = migrate.get_max(self.package_name) |
---|
114 | assert max_version == 2 |
---|
115 | |
---|
116 | def test_version(self): |
---|
117 | # instead of a real ZODB root, we do use a simple dict here, |
---|
118 | # it should be enough for what need to test. |
---|
119 | root = {} |
---|
120 | root = migrate.set_version(root, 10) |
---|
121 | assert root['database_version'] == 10 |
---|
122 | |
---|
123 | def test_max_version(self): |
---|
124 | # instead of a real ZODB root, we do use a simple dict here, |
---|
125 | # it should be enough for what need to test. |
---|
126 | root = {} |
---|
127 | root = migrate.set_max_version(root, self.package_name) |
---|
128 | assert root['database_version'] == 2 |
---|
129 | |
---|
130 | @patch('ow.migrate.get_connection') |
---|
131 | @patch('ow.tests.migrations.1.output') |
---|
132 | @patch('ow.tests.migrations.2.output') |
---|
133 | def test_run_all_migrations(self, pr2, pr1, gc): |
---|
134 | """ |
---|
135 | Test that all migrations apply |
---|
136 | """ |
---|
137 | gc.return_value = mocked_get_connection() |
---|
138 | migrate.run_migrations(None, {}, self.package_name) |
---|
139 | cleanup() |
---|
140 | assert pr1.called |
---|
141 | assert pr2.called |
---|
142 | |
---|
143 | @patch('ow.migrate.get_connection') |
---|
144 | def test_run_no_migrations(self, gc): |
---|
145 | """ |
---|
146 | Test that there are no more migrations to apply |
---|
147 | """ |
---|
148 | gc.return_value = mocked_get_connection(versions=2) |
---|
149 | migrate.run_migrations(None, {}, self.package_name) |
---|
150 | |
---|
151 | @patch('ow.migrate.get_connection') |
---|
152 | @patch('ow.tests.migrations.1.output') |
---|
153 | @patch('ow.tests.migrations.2.output') |
---|
154 | def test_run_invalid_migrations(self, pr2, pr1, gc): |
---|
155 | """ |
---|
156 | Test what happens if a migration does not contains the proper migrate |
---|
157 | method |
---|
158 | """ |
---|
159 | invalid_migration = open(os.path.join(os.path.dirname(__file__), |
---|
160 | 'migrations/3.py'), 'w') |
---|
161 | invalid_migration.write('# This is an empty migration, just for tests') |
---|
162 | invalid_migration.write('def no_migrate_method_here():') |
---|
163 | invalid_migration.write(' print "Nothing to see here!"') |
---|
164 | invalid_migration.close() |
---|
165 | gc.return_value = mocked_get_connection(versions=0) |
---|
166 | migrate.run_migrations(None, {}, self.package_name) |
---|
167 | cleanup() |
---|
168 | assert pr1.called |
---|
169 | assert pr2.called |
---|