# -*- coding: utf-8 -*- import os import oss2 import requests import uuid import time class MyOSS: def __init__(self): """ """ self.AccessKeyID = 'LTAI5t8bioQxGXB1jtVugJcU' self.AccessKeySecret = 'OdGWSBRjkJxaPjgmE38eQ8nzkI6nRk' self.bucket_name = 'hunanwanzhu' self.root_name = 'hnwz' self.auth = oss2.Auth(self.AccessKeyID, self.AccessKeySecret) self.endpoint = 'http://oss-cn-shenzhen.aliyuncs.com' self.bucket = oss2.Bucket(self.auth, self.endpoint, self.bucket_name) self.domain = 'http://hunanwanzhu.oss-cn-shenzhen.aliyuncs.com' self.watermark = '?x-oss-process=image/auto-orient,1/quality,q_90/watermark,text_6aG95Li75p2v5a6e55uY5aSn6LWb,color_ff0000,size_50,rotate_330,g_center,t_70,x_10,y_10' def upload_from_str(self,content_str=None,filename=None,watermark=True): ''' 通过字符串上传 byte,unicode,str ''' filename = os.path.join(self.root_name,filename) result = self.bucket.put_object(filename,content_str) if result.status == 200: url = os.path.join(self.domain,filename) else: url = '' if watermark: return url + self.watermark if url else "" else: return url def upload_from_local(self, localfile=None, ossfile=None): ''' 上传本地文件到oss ''' ossfile = os.path.join(self.root_name, ossfile) with open(localfile, 'rb') as fileobj: result = self.bucket.put_object(ossfile, fileobj) if result.status == 200: url = os.path.join(self.domain, ossfile) else: url = '' return url + self.watermark if url else "" def resumable_upload_from_local(self, localfile=None, ossfile=None): ossfile = os.path.join(self.root_name, ossfile) result = oss2.resumable_upload(self.bucket,ossfile,localfile, store=oss2.ResumableStore(root='/tmp'), multipart_threshold=100*1024, part_size=100*1024, num_threads=4, ) if result.status == 200: url = os.path.join(self.domain,ossfile) else: url = '' return url def upload_from_url(self, url=None, ossfile=None): ''' ''' ossfile = os.path.join(self.root_name,ossfile) resp = requests.get(url) result = self.bucket.put_object(ossfile,resp) if result.status == 200: url = os.path.join(self.domain,ossfile) else: url = '' return url + self.watermark if url else "" hnoss = MyOSS() if __name__ == '__main__': myoss = MyOSS() localfile = '/tmp/test.txt' ossfile = 'test/test.txt' url = myoss.resumable_upload_from_local(localfile,ossfile) print url