1: <?php
2: /**
3: * Copyright 2012-2014 Rackspace US, Inc.
4: *
5: * Licensed under the Apache License, Version 2.0 (the "License");
6: * you may not use this file except in compliance with the License.
7: * You may obtain a copy of the License at
8: *
9: * http://www.apache.org/licenses/LICENSE-2.0
10: *
11: * Unless required by applicable law or agreed to in writing, software
12: * distributed under the License is distributed on an "AS IS" BASIS,
13: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14: * See the License for the specific language governing permissions and
15: * limitations under the License.
16: */
17:
18: namespace OpenCloud\ObjectStore\Upload;
19:
20: use Guzzle\Http\EntityBody;
21: use Guzzle\Http\ReadLimitEntityBody;
22:
23: /**
24: * A transfer type which executes in a concurrent fashion, i.e. with multiple workers uploading at once. Each worker is
25: * charged with uploading a particular chunk of data. The entity body is fragmented into n pieces - calculated by
26: * dividing the total size by the individual part size.
27: *
28: * @codeCoverageIgnore
29: */
30: class ConcurrentTransfer extends AbstractTransfer
31: {
32: public function transfer()
33: {
34: $totalParts = (int) ceil($this->entityBody->getContentLength() / $this->partSize);
35: $workers = min($totalParts, $this->options['concurrency']);
36: $parts = $this->collectParts($workers);
37:
38: while ($this->transferState->count() < $totalParts) {
39:
40: $completedParts = $this->transferState->count();
41: $requests = array();
42:
43: // Iterate over number of workers until total completed parts is what we need it to be
44: for ($i = 0; $i < $workers && ($completedParts + $i) < $totalParts; $i++) {
45:
46: // Offset is the current pointer multiplied by the standard chunk length
47: $offset = ($completedParts + $i) * $this->partSize;
48: $parts[$i]->setOffset($offset);
49:
50: // If this segment is empty (i.e. buffering a half-full chunk), break the iteration
51: if ($parts[$i]->getContentLength() == 0) {
52: break;
53: }
54:
55: // Add this to the request queue for later processing
56: $requests[] = TransferPart::createRequest(
57: $parts[$i],
58: $this->transferState->count() + $i + 1,
59: $this->client,
60: $this->options
61: );
62: }
63:
64: // Iterate over our queued requests and process them
65: foreach ($this->client->send($requests) as $response) {
66: // Add this part to the TransferState
67: $this->transferState->addPart(TransferPart::fromResponse($response));
68: }
69: }
70: }
71:
72: /**
73: * Partitions the entity body into an array - each worker is represented by a key, and the value is a
74: * ReadLimitEntityBody object, whose read limit is fixed based on this object's partSize value. This will always
75: * ensure the chunks are sent correctly.
76: *
77: * @param int The total number of workers
78: * @return array The worker array
79: */
80: private function collectParts($workers)
81: {
82: $uri = $this->entityBody->getUri();
83:
84: $array = array(new ReadLimitEntityBody($this->entityBody, $this->partSize));
85:
86: for ($i = 1; $i < $workers; $i++) {
87: // Need to create a fresh EntityBody, otherwise you'll get weird 408 responses
88: $array[] = new ReadLimitEntityBody(new EntityBody(fopen($uri, 'r')), $this->partSize);
89: }
90:
91: return $array;
92: }
93: }
94: