impuls/lib/python3.11/site-packages/openstack/tests/functional/cloud/test_object.py

209 lines
7.7 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
test_object
----------------------------------
Functional tests for `shade` object methods.
"""
import random
import string
import tempfile
from testtools import content
from openstack.cloud import exc
from openstack.tests.functional import base
class TestObject(base.BaseFunctionalTest):
def setUp(self):
super(TestObject, self).setUp()
if not self.user_cloud.has_service('object-store'):
self.skipTest('Object service not supported by cloud')
def test_create_object(self):
'''Test uploading small and large files.'''
container_name = self.getUniqueString('container')
self.addDetail('container', content.text_content(container_name))
self.addCleanup(self.user_cloud.delete_container, container_name)
self.user_cloud.create_container(container_name)
container = self.user_cloud.get_container(container_name)
self.assertEqual(container_name, container.name)
self.assertEqual(
[], self.user_cloud.list_containers(prefix='somethin')
)
sizes = (
(64 * 1024, 1), # 64K, one segment
(64 * 1024, 5), # 64MB, 5 segments
)
for size, nseg in sizes:
segment_size = int(round(size / nseg))
with tempfile.NamedTemporaryFile() as fake_file:
fake_content = ''.join(
random.SystemRandom().choice(
string.ascii_uppercase + string.digits
)
for _ in range(size)
).encode('latin-1')
fake_file.write(fake_content)
fake_file.flush()
name = 'test-%d' % size
self.addCleanup(
self.user_cloud.delete_object, container_name, name
)
self.user_cloud.create_object(
container_name,
name,
fake_file.name,
segment_size=segment_size,
metadata={'foo': 'bar'},
)
self.assertFalse(
self.user_cloud.is_object_stale(
container_name, name, fake_file.name
)
)
self.assertEqual(
'bar',
self.user_cloud.get_object_metadata(container_name, name)[
'foo'
],
)
self.user_cloud.update_object(
container=container_name,
name=name,
metadata={'testk': 'testv'},
)
self.assertEqual(
'testv',
self.user_cloud.get_object_metadata(container_name, name)[
'testk'
],
)
try:
self.assertIsNotNone(
self.user_cloud.get_object(container_name, name)
)
except exc.OpenStackCloudException as e:
self.addDetail(
'failed_response',
content.text_content(str(e.response.headers)),
)
self.addDetail(
'failed_response', content.text_content(e.response.text)
)
self.assertEqual(
name, self.user_cloud.list_objects(container_name)[0]['name']
)
self.assertEqual(
[], self.user_cloud.list_objects(container_name, prefix='abc')
)
self.assertTrue(
self.user_cloud.delete_object(container_name, name)
)
self.assertEqual([], self.user_cloud.list_objects(container_name))
self.assertEqual(
container_name, self.user_cloud.get_container(container_name).name
)
self.user_cloud.delete_container(container_name)
def test_download_object_to_file(self):
'''Test uploading small and large files.'''
container_name = self.getUniqueString('container')
self.addDetail('container', content.text_content(container_name))
self.addCleanup(self.user_cloud.delete_container, container_name)
self.user_cloud.create_container(container_name)
self.assertEqual(
container_name, self.user_cloud.list_containers()[0]['name']
)
sizes = (
(64 * 1024, 1), # 64K, one segment
(64 * 1024, 5), # 64MB, 5 segments
)
for size, nseg in sizes:
fake_content = ''
segment_size = int(round(size / nseg))
with tempfile.NamedTemporaryFile() as fake_file:
fake_content = ''.join(
random.SystemRandom().choice(
string.ascii_uppercase + string.digits
)
for _ in range(size)
).encode('latin-1')
fake_file.write(fake_content)
fake_file.flush()
name = 'test-%d' % size
self.addCleanup(
self.user_cloud.delete_object, container_name, name
)
self.user_cloud.create_object(
container_name,
name,
fake_file.name,
segment_size=segment_size,
metadata={'foo': 'bar'},
)
self.assertFalse(
self.user_cloud.is_object_stale(
container_name, name, fake_file.name
)
)
self.assertEqual(
'bar',
self.user_cloud.get_object_metadata(container_name, name)[
'foo'
],
)
self.user_cloud.update_object(
container=container_name,
name=name,
metadata={'testk': 'testv'},
)
self.assertEqual(
'testv',
self.user_cloud.get_object_metadata(container_name, name)[
'testk'
],
)
try:
with tempfile.NamedTemporaryFile() as fake_file:
self.user_cloud.get_object(
container_name, name, outfile=fake_file.name
)
downloaded_content = open(fake_file.name, 'rb').read()
self.assertEqual(fake_content, downloaded_content)
except exc.OpenStackCloudException as e:
self.addDetail(
'failed_response',
content.text_content(str(e.response.headers)),
)
self.addDetail(
'failed_response', content.text_content(e.response.text)
)
raise
self.assertEqual(
name, self.user_cloud.list_objects(container_name)[0]['name']
)
self.assertTrue(
self.user_cloud.delete_object(container_name, name)
)
self.assertEqual([], self.user_cloud.list_objects(container_name))
self.assertEqual(
container_name, self.user_cloud.list_containers()[0]['name']
)
self.user_cloud.delete_container(container_name)