Common Operations

Connect to Cloud

import py115
import py115.types

cloud = py115.connect(
    credential=py115.types.Credential(
        uid='', cid='', seid=''
    )
)
# Get storage service
storage = cloud.storage()
# Get offline service
offline = cloud.offline()

List Files

# List files under root directory
for file in storage.list(dir_id='0'):
    print(f'File: ID={file.file_id}, Name={file.name}')

Download File

import subprocess

# Request download ticket
ticket = storage.request_download(pickcode='pickcode-of-file')
# Download via curl command
args = [
    'curl', ticket.url,
    '-o', ticket.file_name,
]
for name, value in ticket.headers.items():
    args.extend([
        '-H', f'{name}: {value}'
    ])
subprocess.call(args)

Upload File

# File to upload
file_path = '/path/to/local-file'

# Request upload ticket
ticket = storage.request_upload(
    dir_id='0',
    file_path=file_path
)
if ticket is None:
    print('Request upload failed!')
elif ticket.is_done:
    print('File has been imported to your cloud!')
else:
    # Upload via "aliyun-oss-python-sdk"
    import oss2
    # Create OSS auth
    auth = oss2.StsAuth(**ticket.oss_token)
    # Get bucket object
    bucket = oss2.Bucket(
        auth=auth,
        endpoint=ticket.oss_endpoint,
        bucket_name=ticket.bucket_name,
    )
    # Upload file, that may take a looooong time
    por = bucket.put_object_from_file(
        key=ticket.object_key,
        filename=file_path,
        headers=ticket.headers, # DO NOT forget this!!!
    )
    # Parse result
    result = por.resp.response.json()
    print(f'Upload result: {result!r}')

List Tasks

# List all offline tasks
for task in offline.list():
    print(f'Task: ID={task.task_id}, Name={task.name}')

Add Task

# Add offline task from download URL
# Support HTTP/HTTPS/FTP/magnet/ed2k link
offline.add_url(
    'ed2k://ed2k-file-link',
    'magnet:?xt=urn:btih:magnet-file-link'
)