pubsub/app/app.php (83 lines of code) (raw):

<?php /** * Copyright 2016 Google Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ namespace Google\Cloud\Samples\PubSub; use DI\Container; use Google\Cloud\PubSub\PubSubClient; use Google\Cloud\Datastore\DatastoreClient; use Psr\Http\Message\ResponseInterface as Response; use Psr\Http\Message\ServerRequestInterface as Request; use Slim\Factory\AppFactory; use Slim\Views\Twig; // Create Container AppFactory::setContainer($container = new Container()); $container->set('view', function () { return Twig::create(__DIR__); }); // Create App $app = AppFactory::create(); // Display errors $app->addErrorMiddleware(true, true, true); $app->get('/', function (Request $request, Response $response, $args) use ($container) { return $container->get('view')->render($response, 'pubsub.html.twig', [ 'project_id' => $container->get('project_id'), ]); }); $app->get('/fetch_messages', function (Request $request, Response $response, $args) use ($container) { // get PUSH pubsub messages $projectId = $container->get('project_id'); $subscriptionName = $container->get('subscription'); $datastore = $container->get('datastore'); $query = $datastore->query()->kind('PubSubPushMessage'); $messages = []; $pushKeys = []; foreach ($datastore->runQuery($query) as $pushMessage) { $pushKeys[] = $pushMessage->key(); $messages[] = $pushMessage['message']; } // delete PUSH messages if ($pushKeys) { $datastore->deleteBatch($pushKeys); } # [START gae_flex_pubsub_index] // get PULL pubsub messages $pubsub = new PubSubClient([ 'projectId' => $projectId, ]); $subscription = $pubsub->subscription($subscriptionName); $pullMessages = []; foreach ($subscription->pull(['returnImmediately' => true]) as $pullMessage) { $pullMessages[] = $pullMessage; $messages[] = $pullMessage->data(); } // acknowledge PULL messages if ($pullMessages) { $subscription->acknowledgeBatch($pullMessages); } # [END gae_flex_pubsub_index] $response->getBody()->write(json_encode($messages)); return $response; }); $app->post('/receive_message', function (Request $request, Response $response, $args) use ($container) { // pull the message from the post body $json = json_decode($request->getContent(), true); if ( !isset($json['message']['data']) || !$message = base64_decode($json['message']['data']) ) { return new Response('', 400); } // store the push message in datastore $datastore = $container->get('datastore'); $message = $datastore->entity('PubSubPushMessage', [ 'message' => $message ]); $datastore->insert($message); return $response; }); $app->post('/send_message', function (Request $request, Response $response, $args) use ($container) { $projectId = $container->get('project_id'); $topicName = $container->get('topic'); # [START gae_flex_pubsub_push] if ($message = (string) $request->getBody()) { // Publish the pubsub message to the topic $pubsub = new PubSubClient([ 'projectId' => $projectId, ]); $topic = $pubsub->topic($topicName); $topic->publish(['data' => $message]); return $response->withStatus(204); } # [END gae_flex_pubsub_push] return $response->withStatus(400); }); $container->set('datastore', function () use ($container) { return new DatastoreClient([ 'projectId' => $container->get('project_id'), ]); }); return $app;