209 lines
7.7 KiB
Python
209 lines
7.7 KiB
Python
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
"""
|
|
test_object
|
|
----------------------------------
|
|
|
|
Functional tests for `shade` object methods.
|
|
"""
|
|
|
|
import random
|
|
import string
|
|
import tempfile
|
|
|
|
from testtools import content
|
|
|
|
from openstack.cloud import exc
|
|
from openstack.tests.functional import base
|
|
|
|
|
|
class TestObject(base.BaseFunctionalTest):
|
|
def setUp(self):
|
|
super(TestObject, self).setUp()
|
|
if not self.user_cloud.has_service('object-store'):
|
|
self.skipTest('Object service not supported by cloud')
|
|
|
|
def test_create_object(self):
|
|
'''Test uploading small and large files.'''
|
|
container_name = self.getUniqueString('container')
|
|
self.addDetail('container', content.text_content(container_name))
|
|
self.addCleanup(self.user_cloud.delete_container, container_name)
|
|
self.user_cloud.create_container(container_name)
|
|
container = self.user_cloud.get_container(container_name)
|
|
self.assertEqual(container_name, container.name)
|
|
self.assertEqual(
|
|
[], self.user_cloud.list_containers(prefix='somethin')
|
|
)
|
|
sizes = (
|
|
(64 * 1024, 1), # 64K, one segment
|
|
(64 * 1024, 5), # 64MB, 5 segments
|
|
)
|
|
for size, nseg in sizes:
|
|
segment_size = int(round(size / nseg))
|
|
with tempfile.NamedTemporaryFile() as fake_file:
|
|
fake_content = ''.join(
|
|
random.SystemRandom().choice(
|
|
string.ascii_uppercase + string.digits
|
|
)
|
|
for _ in range(size)
|
|
).encode('latin-1')
|
|
|
|
fake_file.write(fake_content)
|
|
fake_file.flush()
|
|
name = 'test-%d' % size
|
|
self.addCleanup(
|
|
self.user_cloud.delete_object, container_name, name
|
|
)
|
|
self.user_cloud.create_object(
|
|
container_name,
|
|
name,
|
|
fake_file.name,
|
|
segment_size=segment_size,
|
|
metadata={'foo': 'bar'},
|
|
)
|
|
self.assertFalse(
|
|
self.user_cloud.is_object_stale(
|
|
container_name, name, fake_file.name
|
|
)
|
|
)
|
|
self.assertEqual(
|
|
'bar',
|
|
self.user_cloud.get_object_metadata(container_name, name)[
|
|
'foo'
|
|
],
|
|
)
|
|
self.user_cloud.update_object(
|
|
container=container_name,
|
|
name=name,
|
|
metadata={'testk': 'testv'},
|
|
)
|
|
self.assertEqual(
|
|
'testv',
|
|
self.user_cloud.get_object_metadata(container_name, name)[
|
|
'testk'
|
|
],
|
|
)
|
|
try:
|
|
self.assertIsNotNone(
|
|
self.user_cloud.get_object(container_name, name)
|
|
)
|
|
except exc.OpenStackCloudException as e:
|
|
self.addDetail(
|
|
'failed_response',
|
|
content.text_content(str(e.response.headers)),
|
|
)
|
|
self.addDetail(
|
|
'failed_response', content.text_content(e.response.text)
|
|
)
|
|
self.assertEqual(
|
|
name, self.user_cloud.list_objects(container_name)[0]['name']
|
|
)
|
|
self.assertEqual(
|
|
[], self.user_cloud.list_objects(container_name, prefix='abc')
|
|
)
|
|
self.assertTrue(
|
|
self.user_cloud.delete_object(container_name, name)
|
|
)
|
|
self.assertEqual([], self.user_cloud.list_objects(container_name))
|
|
self.assertEqual(
|
|
container_name, self.user_cloud.get_container(container_name).name
|
|
)
|
|
self.user_cloud.delete_container(container_name)
|
|
|
|
def test_download_object_to_file(self):
|
|
'''Test uploading small and large files.'''
|
|
container_name = self.getUniqueString('container')
|
|
self.addDetail('container', content.text_content(container_name))
|
|
self.addCleanup(self.user_cloud.delete_container, container_name)
|
|
self.user_cloud.create_container(container_name)
|
|
self.assertEqual(
|
|
container_name, self.user_cloud.list_containers()[0]['name']
|
|
)
|
|
sizes = (
|
|
(64 * 1024, 1), # 64K, one segment
|
|
(64 * 1024, 5), # 64MB, 5 segments
|
|
)
|
|
for size, nseg in sizes:
|
|
fake_content = ''
|
|
segment_size = int(round(size / nseg))
|
|
with tempfile.NamedTemporaryFile() as fake_file:
|
|
fake_content = ''.join(
|
|
random.SystemRandom().choice(
|
|
string.ascii_uppercase + string.digits
|
|
)
|
|
for _ in range(size)
|
|
).encode('latin-1')
|
|
|
|
fake_file.write(fake_content)
|
|
fake_file.flush()
|
|
name = 'test-%d' % size
|
|
self.addCleanup(
|
|
self.user_cloud.delete_object, container_name, name
|
|
)
|
|
self.user_cloud.create_object(
|
|
container_name,
|
|
name,
|
|
fake_file.name,
|
|
segment_size=segment_size,
|
|
metadata={'foo': 'bar'},
|
|
)
|
|
self.assertFalse(
|
|
self.user_cloud.is_object_stale(
|
|
container_name, name, fake_file.name
|
|
)
|
|
)
|
|
self.assertEqual(
|
|
'bar',
|
|
self.user_cloud.get_object_metadata(container_name, name)[
|
|
'foo'
|
|
],
|
|
)
|
|
self.user_cloud.update_object(
|
|
container=container_name,
|
|
name=name,
|
|
metadata={'testk': 'testv'},
|
|
)
|
|
self.assertEqual(
|
|
'testv',
|
|
self.user_cloud.get_object_metadata(container_name, name)[
|
|
'testk'
|
|
],
|
|
)
|
|
try:
|
|
with tempfile.NamedTemporaryFile() as fake_file:
|
|
self.user_cloud.get_object(
|
|
container_name, name, outfile=fake_file.name
|
|
)
|
|
downloaded_content = open(fake_file.name, 'rb').read()
|
|
self.assertEqual(fake_content, downloaded_content)
|
|
except exc.OpenStackCloudException as e:
|
|
self.addDetail(
|
|
'failed_response',
|
|
content.text_content(str(e.response.headers)),
|
|
)
|
|
self.addDetail(
|
|
'failed_response', content.text_content(e.response.text)
|
|
)
|
|
raise
|
|
self.assertEqual(
|
|
name, self.user_cloud.list_objects(container_name)[0]['name']
|
|
)
|
|
self.assertTrue(
|
|
self.user_cloud.delete_object(container_name, name)
|
|
)
|
|
self.assertEqual([], self.user_cloud.list_objects(container_name))
|
|
self.assertEqual(
|
|
container_name, self.user_cloud.list_containers()[0]['name']
|
|
)
|
|
self.user_cloud.delete_container(container_name)
|