From 9c423171944b71528344d7315e5909864c9b653c Mon Sep 17 00:00:00 2001 From: LiHS Date: Mon, 5 Sep 2022 15:41:58 +0800 Subject: [PATCH] support retry expired parts with upload --- src/Qiniu/Http/Error.php | 3 + src/Qiniu/Storage/ResumeUploader.php | 331 ++++++++++++++++++--------- src/Qiniu/functions.php | 2 +- tests/Qiniu/Tests/ResumeUpTest.php | 6 - 4 files changed, 232 insertions(+), 110 deletions(-) diff --git a/src/Qiniu/Http/Error.php b/src/Qiniu/Http/Error.php index 73477cf4..8fba74f1 100644 --- a/src/Qiniu/Http/Error.php +++ b/src/Qiniu/Http/Error.php @@ -10,6 +10,9 @@ final class Error { private $url; + /** + * @var Response + */ private $response; public function __construct($url, $response) diff --git a/src/Qiniu/Storage/ResumeUploader.php b/src/Qiniu/Storage/ResumeUploader.php index 097c836a..39e9deec 100644 --- a/src/Qiniu/Storage/ResumeUploader.php +++ b/src/Qiniu/Storage/ResumeUploader.php @@ -42,18 +42,18 @@ final class ResumeUploader * * @param string $upToken 上传凭证 * @param string $key 上传文件名 - * @param string $inputStream 上传二进制流 - * @param string $size 上传流的大小 - * @param string $params 自定义变量 + * @param resource $inputStream 上传二进制流 + * @param int $size 上传流的大小 + * @param array $params 自定义变量 * @param string $mime 上传数据的mimeType * @param Config $config * @param string $resumeRecordFile 断点续传的已上传的部分信息记录文件 * @param string $version 分片上传版本 目前支持v1/v2版本 默认v1 * @param int $partSize 分片上传v2字段 默认大小为4MB 分片大小范围为1 MB - 1 GB * @param RequestOptions $reqOpt 分片上传v2字段 默认大小为4MB 分片大小范围为1 MB - 1 GB - * @link http://developer.qiniu.com/docs/v6/api/overview/up/response/vars.html#xvar - * * @throws \Exception + * + * @link http://developer.qiniu.com/docs/v6/api/overview/up/response/vars.html#xvar */ public function __construct( $upToken, @@ -107,17 +107,15 @@ public function __construct( /** * 上传操作 + * @param $fname string 文件名 + * + * @throws \Exception */ public function upload($fname) { - $uploaded = 0; - if ($this->version == SplitUploadVersion::V2) { - $partNumber = 1; - $encodedObjectName = $this->key ? \Qiniu\base64_urlSafeEncode($this->key) : '~'; - }; + $blkputRets = null; // get upload record from resumeRecordFile if ($this->resumeRecordFile != null) { - $blkputRets = null; if (file_exists($this->resumeRecordFile)) { $stream = fopen($this->resumeRecordFile, 'r'); if ($stream) { @@ -142,64 +140,68 @@ public function upload($fname) } else { error_log("resumeFile not exists"); } + } - if ($blkputRets) { - if ($this->version == SplitUploadVersion::V1) { - if (isset($blkputRets['contexts']) && isset($blkputRets['uploaded']) && - is_array($blkputRets['contexts']) && is_int($blkputRets['uploaded'])) { - $this->contexts = $blkputRets['contexts']; - $uploaded = $blkputRets['uploaded']; - } - } elseif ($this->version == SplitUploadVersion::V2) { - if (isset($blkputRets["etags"]) && isset($blkputRets["uploadId"]) && - isset($blkputRets["expiredAt"]) && $blkputRets["expiredAt"] > time() - && $blkputRets["uploaded"] > 0 && is_array($blkputRets["etags"]) && - is_string($blkputRets["uploadId"]) && is_int($blkputRets["expiredAt"])) { - $this->finishedEtags['etags'] = $blkputRets["etags"]; - $this->finishedEtags["uploadId"] = $blkputRets["uploadId"]; - $this->finishedEtags["expiredAt"] = $blkputRets["expiredAt"]; - $this->finishedEtags["uploaded"] = $blkputRets["uploaded"]; - $uploaded = $blkputRets["uploaded"]; - $partNumber = count($this->finishedEtags["etags"]) + 1; + if ($this->version == SplitUploadVersion::V1) { + return $this->uploadV1($fname, $blkputRets); + } elseif ($this->version == SplitUploadVersion::V2) { + return $this->uploadV2($fname, $blkputRets); + } else { + throw new \Exception("only support v1/v2 now!"); + } + } + + /** + * @param string $fname 文件名 + * @param null|array $blkputRets + * + * @throws \Exception + */ + private function uploadV1($fname, $blkputRets = null) + { + // 尝试恢复恢复已上传的数据 + $isResumeUpload = $blkputRets !== null; + $this->contexts = array(); + + if ($blkputRets) { + if (isset($blkputRets['contexts']) && isset($blkputRets['uploaded']) && + is_array($blkputRets['contexts']) && is_int($blkputRets['uploaded']) + ) { + $this->contexts = array_map(function ($ctx) { + if (is_array($ctx)) { + return $ctx; } else { - $this->makeInitReq($encodedObjectName); + // 兼容旧版本(旧版本没有存储 expireAt) + return array( + "ctx" => $ctx, + "expiredAt" => 0, + ); } - } else { - throw new \Exception("only support v1/v2 now!"); - } - } else { - if ($this->version == SplitUploadVersion::V2) { - $this->makeInitReq($encodedObjectName); - } - } - } else { - // init a Multipart Upload task if choose v2 - if ($this->version == SplitUploadVersion::V2) { - $this->makeInitReq($encodedObjectName); + }, $blkputRets['contexts']); } } + // 上传分片 + $uploaded = 0; while ($uploaded < $this->size) { $blockSize = $this->blockSize($uploaded); + $blockIndex = $uploaded / $this->partSize; + if (!is_int($blockIndex)) { + throw new \Exception("v1 part size changed"); + } + // 如果已上传该分片且没有过期 + if (isset($this->contexts[$blockIndex]) && $this->contexts[$blockIndex]["expiredAt"] >= time()) { + $uploaded += $blockSize; + fseek($this->inputStream, $blockSize, SEEK_CUR); + continue; + } $data = fread($this->inputStream, $blockSize); if ($data === false) { throw new \Exception("file read failed", 1); } - if ($this->version == SplitUploadVersion::V1) { - $crc = \Qiniu\crc32_data($data); - $response = $this->makeBlock($data, $blockSize); - } elseif ($this->version == SplitUploadVersion::V2) { - $md5 = md5($data); - $response = $this->uploadPart( - $data, - $partNumber, - $this->finishedEtags["uploadId"], - $encodedObjectName, - $md5 - ); - } else { - throw new \Exception("only support v1/v2 now!"); - } + $crc = \Qiniu\crc32_data($data); + $response = $this->makeBlock($data, $blockSize); + $ret = null; if ($response->ok() && $response->json() != null) { @@ -217,55 +219,148 @@ public function upload($fname) $this->host = $upHostBackup; } - if ($this->version == SplitUploadVersion::V1) { - if ($response->needRetry() || !isset($ret['crc32']) || $crc != $ret['crc32']) { - $response = $this->makeBlock($data, $blockSize); - $ret = $response->json(); + if ($response->needRetry() || !isset($ret['crc32']) || $crc != $ret['crc32']) { + $response = $this->makeBlock($data, $blockSize); + $ret = $response->json(); + } + if (!$response->ok() || !isset($ret['crc32']) || $crc != $ret['crc32']) { + return array(null, new Error($this->currentUrl, $response)); + } + + // 如果可以在已上传取到说明是过期分片直接修改已上传信息,否则是新的片添加到已上传分片尾部 + if (isset($this->contexts[$blockIndex])) { + $this->contexts[$blockIndex] = array( + 'ctx' => $ret['ctx'], + 'expiredAt' => $ret['expired_at'], + ); + } else { + array_push($this->contexts, array( + 'ctx' => $ret['ctx'], + 'expiredAt' => $ret['expired_at'], + )); + } + $uploaded += $blockSize; + + // 记录断点 + if ($this->resumeRecordFile !== null) { + $recordData = array( + 'contexts' => $this->contexts, + 'uploaded' => $uploaded + ); + $recordData = json_encode($recordData); + + if ($recordData) { + $isWritten = file_put_contents($this->resumeRecordFile, $recordData); + if ($isWritten === false) { + error_log("write resumeRecordFile failed"); + } + } else { + error_log('resumeRecordData encode failed'); } + } + } - if (!$response->ok() || !isset($ret['crc32']) || $crc != $ret['crc32']) { - return array(null, new Error($this->currentUrl, $response)); + // 完成上传 + list($ret, $err) = $this->makeFile($fname); + if ($err !== null) { + $response = $err->getResponse(); + if ($isResumeUpload && $response->statusCode === 701) { + fseek($this->inputStream, 0); + return $this->uploadV1($fname); + } + } + return array($ret, $err); + } + + /** + * @param string $fname 文件名 + * @param null|array $blkputRets + * + * @throws \Exception + */ + private function uploadV2($fname, $blkputRets = null) + { + $uploaded = 0; + $partNumber = 1; + $encodedObjectName = $this->key ? \Qiniu\base64_urlSafeEncode($this->key) : '~'; + + $isResumeUpload = $blkputRets !== null; + if ($blkputRets) { + if (isset($blkputRets["etags"]) && isset($blkputRets["uploadId"]) && + isset($blkputRets["expiredAt"]) && $blkputRets["expiredAt"] > time() && + $blkputRets["uploaded"] > 0 && is_array($blkputRets["etags"]) && + is_string($blkputRets["uploadId"]) && is_int($blkputRets["expiredAt"]) + ) { + $this->finishedEtags['etags'] = $blkputRets["etags"]; + $this->finishedEtags["uploadId"] = $blkputRets["uploadId"]; + $this->finishedEtags["expiredAt"] = $blkputRets["expiredAt"]; + $this->finishedEtags["uploaded"] = $blkputRets["uploaded"]; + $uploaded = $blkputRets["uploaded"]; + $partNumber = count($this->finishedEtags["etags"]) + 1; + } else { + $this->makeInitReq($encodedObjectName); + } + } else { + $this->makeInitReq($encodedObjectName); + } + + fseek($this->inputStream, $uploaded); + while ($uploaded < $this->size) { + $blockSize = $this->blockSize($uploaded); + $data = fread($this->inputStream, $blockSize); + if ($data === false) { + throw new \Exception("file read failed", 1); + } + $md5 = md5($data); + $response = $this->uploadPart( + $data, + $partNumber, + $this->finishedEtags["uploadId"], + $encodedObjectName, + $md5 + ); + + $ret = null; + if ($response->ok() && $response->json() != null) { + $ret = $response->json(); + } + if ($response->statusCode < 0) { + list($accessKey, $bucket, $err) = \Qiniu\explodeUpToken($this->upToken); + if ($err != null) { + return array(null, $err); } - array_push($this->contexts, $ret['ctx']); - } elseif ($this->version == SplitUploadVersion::V2) { - if ($response->needRetry() || !isset($ret['md5']) || $md5 != $ret['md5']) { - $response = $this->uploadPart( - $data, - $partNumber, - $this->finishedEtags["uploadId"], - $encodedObjectName, - $md5 - ); - $ret = $response->json(); + list($upHostBackup, $err) = $this->config->getUpBackupHostV2($accessKey, $bucket); + if ($err != null) { + return array(null, $err); } + $this->host = $upHostBackup; + } - if (!$response->ok() || !isset($ret['md5']) || $md5 != $ret['md5']) { - return array(null, new Error($this->currentUrl, $response)); - } - $blockStatus = array('etag' => $ret['etag'], 'partNumber' => $partNumber); - array_push($this->finishedEtags['etags'], $blockStatus); - $partNumber += 1; - } else { - throw new \Exception("only support v1/v2 now!"); + if ($response->needRetry() || !isset($ret['md5']) || $md5 != $ret['md5']) { + $response = $this->uploadPart( + $data, + $partNumber, + $this->finishedEtags["uploadId"], + $encodedObjectName, + $md5 + ); + $ret = $response->json(); + } + if ($isResumeUpload && $response->statusCode === 612) { + return $this->uploadV2($fname); } + if (!$response->ok() || !isset($ret['md5']) || $md5 != $ret['md5']) { + return array(null, new Error($this->currentUrl, $response)); + } + $blockStatus = array('etag' => $ret['etag'], 'partNumber' => $partNumber); + array_push($this->finishedEtags['etags'], $blockStatus); + $partNumber += 1; $uploaded += $blockSize; - if ($this->version == SplitUploadVersion::V2) { - $this->finishedEtags['uploaded'] = $uploaded; - } + $this->finishedEtags['uploaded'] = $uploaded; if ($this->resumeRecordFile !== null) { - if ($this->version == SplitUploadVersion::V1) { - $recordData = array( - 'contexts' => $this->contexts, - 'uploaded' => $uploaded - ); - $recordData = json_encode($recordData); - } elseif ($this->version == SplitUploadVersion::V2) { - $recordData = json_encode($this->finishedEtags); - } else { - throw new \Exception("only support v1/v2 now!"); - } + $recordData = json_encode($this->finishedEtags); if ($recordData) { $isWritten = file_put_contents($this->resumeRecordFile, $recordData); if ($isWritten === false) { @@ -276,13 +371,15 @@ public function upload($fname) } } } - if ($this->version == SplitUploadVersion::V1) { - return $this->makeFile($fname); - } elseif ($this->version == SplitUploadVersion::V2) { - return $this->completeParts($fname, $this->finishedEtags['uploadId'], $encodedObjectName); - } else { - throw new \Exception("only support v1/v2 now!"); + + list($ret, $err) = $this->completeParts($fname, $this->finishedEtags['uploadId'], $encodedObjectName); + if ($err !== null) { + $response = $err->getResponse(); + if ($isResumeUpload && $response->statusCode === 612) { + return $this->uploadV2($fname); + } } + return array($ret, $err); } /** @@ -313,15 +410,25 @@ private function fileUrl($fname) /** * 创建文件 + * + * @param string $fname 文件名 + * @return array{array | null, Error | null} */ private function makeFile($fname) { $url = $this->fileUrl($fname); - $body = implode(',', $this->contexts); + $body = implode(',', array_map(function ($ctx) { + return $ctx['ctx']; + }, $this->contexts)); $response = $this->post($url, $body); if ($response->needRetry()) { $response = $this->post($url, $body); } + if ($response->statusCode === 200 || $response->statusCode === 701) { + if ($this->resumeRecordFile !== null) { + @unlink($this->resumeRecordFile); + } + } if (!$response->ok()) { return array(null, new Error($this->currentUrl, $response)); } @@ -377,9 +484,22 @@ private function uploadPart($block, $partNumber, $uploadId, $encodedObjectName, $url = $this->host . '/buckets/' . $this->bucket . '/objects/' . $encodedObjectName . '/uploads/' . $uploadId . '/' . $partNumber; $response = $this->put($url, $block, $headers); + if ($response->statusCode === 612) { + if ($this->resumeRecordFile !== null) { + @unlink($this->resumeRecordFile); + } + } return $response; } + /** + * 完成分片上传V2 + * + * @param string $fname 文件名 + * @param int $uploadId 由 {@see initReq} 获取 + * @param string $encodedObjectName 经过编码的存储路径 + * @return array{array | null, Error | null} + */ private function completeParts($fname, $uploadId, $encodedObjectName) { $headers = array( @@ -418,6 +538,11 @@ private function completeParts($fname, $uploadId, $encodedObjectName) if ($response->needRetry()) { $response = $this->postWithHeaders($url, $jsonBody, $headers); } + if ($response->statusCode === 200 || $response->statusCode === 612) { + if ($this->resumeRecordFile !== null) { + @unlink($this->resumeRecordFile); + } + } if (!$response->ok()) { return array(null, new Error($this->currentUrl, $response)); } diff --git a/src/Qiniu/functions.php b/src/Qiniu/functions.php index 6f87905f..3c03455e 100644 --- a/src/Qiniu/functions.php +++ b/src/Qiniu/functions.php @@ -24,7 +24,7 @@ function crc32_file($file) /** * 计算输入流的crc32检验码 * - * @param $data 待计算校验码的字符串 + * @param $data string 待计算校验码的字符串 * * @return string 输入字符串的crc32校验码 */ diff --git a/tests/Qiniu/Tests/ResumeUpTest.php b/tests/Qiniu/Tests/ResumeUpTest.php index ac4bcaca..69ff2771 100755 --- a/tests/Qiniu/Tests/ResumeUpTest.php +++ b/tests/Qiniu/Tests/ResumeUpTest.php @@ -56,7 +56,6 @@ public function test4ML() ); $this->assertNull($error); $this->assertNotNull($ret['hash']); - unlink($resumeFile); $domain = getenv('QINIU_TEST_DOMAIN'); $response = Client::get("http://$domain/$key"); @@ -85,7 +84,6 @@ public function test4ML2() ); $this->assertNull($error); $this->assertNotNull($ret['hash']); - unlink($resumeFile); $domain = getenv('QINIU_TEST_DOMAIN'); $response = Client::get("http://$domain/$key"); @@ -166,7 +164,6 @@ public function testResumeUploadWithParams() $this->assertEquals("val_1", $ret['var_1']); $this->assertEquals("val_2", $ret['var_2']); $this->assertEquals(basename($tempFile), $ret['fname']); - unlink($resumeFile); $domain = getenv('QINIU_TEST_DOMAIN'); $response = Client::get("http://$domain/$key"); @@ -208,7 +205,6 @@ public function testResumeUploadV2() ); $this->assertNull($error); $this->assertNotNull($ret['hash']); - unlink($resumeFile); $domain = getenv('QINIU_TEST_DOMAIN'); $response = Client::get("http://$domain/$key"); @@ -242,7 +238,6 @@ public function testResumeUploadV2WithParams() $this->assertEquals("val_1", $ret['var_1']); $this->assertEquals("val_2", $ret['var_2']); $this->assertEquals(basename($tempFile), $ret['fname']); - unlink($resumeFile); $domain = getenv('QINIU_TEST_DOMAIN'); $response = Client::get("http://$domain/$key"); @@ -301,7 +296,6 @@ public function testResumeUploadWithInvalidVersion() $this->assertTrue($isRightException); } - unlink($resumeFile); unlink($tempFile); } $this->assertEquals(count($testInvalidVersions), $expectExceptionCount);