Integrando Amazon Alexa con Kubernetes API y Azure Functions

¿Imaginas poder escalar tu cluster de Kubernetes o darle órdenes a un ejército de bots sólo con tu voz? ¡Publicación y explicación del código que Alberto Marcos y servidor utilizamos en la Global Azure Bootcamp 2019!

10 min de lectura
Integrando Amazon Alexa con Kubernetes API y Azure Functions

Ya han pasado varios meses desde que se celebrase la Global Azure Bootcamp 2019 y he caído en la cuenta de que no había hablado en detalle del código del experimento que os hicimos Alberto Marcos y servidor en directo. Hace unos pocos días publiqué el código en este repositorio de Github, así que ya puedo hablaros de ello.

¿Qué hicimos?

Utilizamos OpenKore, un cliente y herramienta de automatización de código abierto para Ragnarok Online, o dicho de otra manera, un bot. Con este software corriendo como contenedor Docker en un cluster de Azure Kubernetes Service hicimos una serie de experimentos:

  • Creamos un plugin para OpenKore que se conecta a un Azure Storage Queue, retira los comandos que encuentran en la cola -acciones para el bot- y los ejecuta.
  • Con la idea de poder introducir comandos en la cola usando nuestra voz, creamos una skill de Amazon Alexa. Esta pieza transforma los comandos de voz en intenciones que son recibidas por la siguiente pieza.
  • Azure Functions, que transforma las intenciones que nos facilita Alexa y las convierte en llamadas a la API que corresponda. En nuestro caso podían ser dos: la API de Kubernetes o bien la cola de Azure Storage mencionada en el primer punto, donde el bot recogerá órdenes para procesar.

Con todo ello conseguimos dos objetivos a través de comandos de voz de Alexa: escalar el número de bots en el servidor vía API de Kubernetes y dar órdenes a los bots que ejecutarán dentro del juego.

Hicimos también un experimento adicional con Azure TextAnalytics y Azure LogicApps que también está publicado, pero contaré en otro artículo.

Todo lo que hicimos aquí se realizó de forma experimental para propósitos de demostración técnica, por lo que no fuimos cuidadosos con el código, no realizamos ninguna optimización ni tampoco nos preocupamos de ninguna práctica de seguridad. Por otro lado, ni Alberto Marcos ni servidor somos desarrolladores expertos, ya que estamos más orientados a la rama de la infraestructura.

Si decides usar alguna parte del código aquí expuesto, es responsabilidad tuya tener todos estos aspectos en cuenta.

Implementando la skill de Alexa

He quedado muy gratamente sorprendido de lo sencillo que es desarrollar skills para Alexa, el entorno de desarrollo es muy intuitivo y en pocos minutos estaremos haciendo nuestras skills.

Todo se desarrolla sobre la premisa de que cuando queremos interactuar con un sistema informático mediante voz, lo que en realidad hacemos es trasmitirle una orden, o mejor dicho, una intención. Esto es justo lo que hace Alexa, en base una petición de voz, la transforma en una intención con parámetros que podemos tratar programáticamente.

¿Y cómo le declaramos todo esto a Alexa? Mediante formato JSON o bien utilizando una interfaz gráfica para definir cada intención.

El JSON de la skill que utilizamos en la demo fue el siguiente:

{
  "interactionModel": {
    "languageModel": {
      "invocationName": "botijo dios",
      "intents": [
        {
          "name": "AMAZON.CancelIntent",
          "samples": [
            "Cancelar"
          ]
        },
        {
          "name": "AMAZON.HelpIntent",
          "samples": [
            "Ayuda",
            "Dame instrucciones"
          ]
        },
        {
          "name": "AMAZON.StopIntent",
          "samples": [
            "Para",
            "Salir"
          ]
        },
        {
          "name": "AMAZON.NavigateHomeIntent",
          "samples": []
        },
        {
          "name": "EmotionWhistleIntent",
          "slots": [],
          "samples": [
            "Silba",
            "Silbad",
            "Cantad"
          ]
        },
        {
          "name": "EmotionHearthIntent",
          "slots": [],
          "samples": [
            "Vamos a comer pizza",
            "Qué pensáis del evento"
          ]
        },
        {
          "name": "EmotionIdeaIntent",
          "slots": [],
          "samples": [
            "Pensad en algo útil",
            "Piensa en algo útil",
            "Qué ideas se os ocurren"
          ]
        },
        {
          "name": "SitIntent",
          "slots": [],
          "samples": [
            "Sentaos",
            "Siéntate"
          ]
        },
        {
          "name": "StandupIntent",
          "slots": [],
          "samples": [
            "Levantaos",
            "Levanta"
          ]
        },
        {
          "name": "TalkIntent",
          "slots": [
            {
              "name": "Message",
              "type": "AMAZON.SearchQuery"
            }
          ],
          "samples": [
            "Decid {Message}",
            "Dí {Message}"
          ]
        },
        {
          "name": "AttackIntent",
          "slots": [],
          "samples": [
            "Atacad",
            "Ataca"
          ]
        },
        {
          "name": "NoAttackIntent",
          "slots": [],
          "samples": [
            "Dejad de atacar",
            "No ataquéis",
            "Deja de atacar",
            "No ataques"
          ]
        },
        {
          "name": "CurrentNumberIntent",
          "slots": [],
          "samples": [
            "Dime cuantos bots hay conectados",
            "Dime cuantos socios hay conectados",
            "Dime cuantos amigos hay conectados"
          ]
        },
        {
          "name": "ScaleIntent",
          "slots": [
            {
              "name": "number",
              "type": "AMAZON.NUMBER"
            }
          ],
          "samples": [
            "Cambia el número de socios a {number}",
            "Llama a {number} socios",
            "Llama a {number} amigos",
            "Invoca a {number} bots",
            "Reduce el número de socios a {number}"
          ]
        },
        {
          "name": "AMAZON.PauseIntent",
          "samples": []
        },
        {
          "name": "AMAZON.ResumeIntent",
          "samples": []
        },
        {
          "name": "ReunitedIntent",
          "slots": [],
          "samples": [
            "Reune a los socios",
            "Junta todos los amigos",
            "Reune a los bots",
            "Reuníos"
          ]
        }
      ],
      "types": []
    }
  }
}

Como queremos manejar lo que venga de Alexa mediante Azure Functions y no Amazon Lambda (opción por defecto) no tenemos más que cambiar el Endpoint a nuestra URL de Azure Functions preparada para recibir llamadas de Alexa.

Implementado la integración con Azure Functions

Para poder recepcionar llamadas HTTP de Alexa, nuestro código debe estar preparado para procesarlas. Aquí nos facilitó muchísimo la vida la biblioteca de Alexa.NET que funciona tanto con .NET Standard como .NET Core.

Aunque se puede ver el código de la función aquí, escribo aquí una de las partes más relevantes, que es donde en función de la intención recibida, llevamos a cabo una acción u otra:

switch (intentRequest.Intent.Name) {
    case "CurrentNumberIntent":
        log.LogInformation ("[INFO] CurrentNumberIntent");
        response = ResponseBuilder.Tell ("Socio, hay un total de " + CurrentBotNumber (dbConnStr, log).ToString () +
            " bots conectados.");
        response.Response.ShouldEndSession = true;
        break;
    case "SitIntent":
        log.LogInformation ("[INFO] SitIntent");
        numberResult = IssueCommandToOnlineBots ("sit", dbConnStr, queueConnStr, log);
        response = ResponseBuilder.Tell ("He ordenado a " + numberResult.ToString () + " bots que se sienten.");
        response.Response.ShouldEndSession = true;
        break;
    case "ScaleIntent":
        log.LogInformation ("[INFO] ScaleIntent");
        var numOfBotsStr = intentRequest.Intent.Slots["number"].Value;
        numberResult = await ScaleCommandToBots (numOfBotsStr, k8sApiUri, k8sBearerToken, k8sDeploymentName, log);
        response = ResponseBuilder.Tell ("¡Llamando al ejercito! Has pedido escalar a: " + numOfBotsStr + " bots");
        response.Response.ShouldEndSession = true;
        break;
    case "ReunitedIntent":
        log.LogInformation ("[INFO] ReunitedIntent");
        numberResult = IssueCommandToOnlineBots ("move 290 185", dbConnStr, queueConnStr, log);
        response = ResponseBuilder.Tell ("He ordenado a los bots moverse al punto de reunión");
        response.Response.ShouldEndSession = true;
        break;
    case "AttackIntent":
        log.LogInformation ("[INFO] AttackIntent");
        numberResult = IssueCommandToOnlineBots ("a yes", dbConnStr, queueConnStr, log);
        log.LogInformation ("[INFO] AttackIntent, command issued to " + numberResult.ToString () + " bots");
        response = ResponseBuilder.Tell ("¡DRAKARIS!");
        response.Response.ShouldEndSession = true;
        break;
    default:
        break;
}

Como podéis ver, recibimos un INTENT de Alexa, que transformamos en una llamada a un método. ¡Así convertimos la voz en acciones de código reales!

Implementando llamadas a Kuberentes

El Azure Function mencionado interactúa a su vez con nuestro cluster de Kubernetes -que se encuentra en AKS- a través de su API REST. La arquitectura de alto nivel sería:
alexaazure1

Podéis ver el case que se ocupa de la intención de escalado en el código anterior:

case "ScaleIntent":
    log.LogInformation ("[INFO] ScaleIntent");
    var numOfBotsStr = intentRequest.Intent.Slots["number"].Value;
    numberResult = await ScaleCommandToBots (numOfBotsStr, k8sApiUri, k8sBearerToken, k8sDeploymentName, log);
    response = ResponseBuilder.Tell ("¡Llamando al ejercito! Has pedido escalar a: " + numOfBotsStr + " bots");
    response.Response.ShouldEndSession = true;
    break;

Una vez más hay una biblioteca para .NET que nos facilita bastante la vida: KubeClient.

El código en nuestro Azure Functions que realiza la operación es:

public static async Task<int> ScaleCommandToBots (string numOfBotsStr, string k8sApiUri, string k8sBearerToken,
    string k8sDeploymentName, ILogger log) {
    try {
        log.LogInformation ("[INFO] Attempting to scale to " + numOfBotsStr + " pods.");
        var parsedNumOfBots = int.Parse (numOfBotsStr);
        var kubernetesClient = new KubernetesClient (log);
        await kubernetesClient.Scale (parsedNumOfBots, k8sApiUri, k8sBearerToken, k8sDeploymentName);

        return parsedNumOfBots;
    } catch (Exception ex) {
        log.LogError ("[ERROR] " + ex.Message);
        return -1;
    }
}

La implementación que hicimos fue mínima y extremadamente simple. Se puede ver aquí, siendo los métodos que realizan la modificación del deployment los siguientes:

public async Task<int> Scale (int numberOfPods, string k8sApiUri, string k8sBearerToken, string k8sDeploymentName) {
    try {
        KubeClientOptions options = new KubeClientOptions () {
            ApiEndPoint = new Uri (k8sApiUri),
            AuthStrategy = KubeAuthStrategy.BearerToken,
            AccessToken = k8sBearerToken,
            AllowInsecure = true
        };

        using (KubeApiClient client = KubeApiClient.Create (options)) {
            var deployments = await client.DeploymentsV1 ().List ();
            var currentDeployment = deployments.First (i => i.Metadata.Name == k8sDeploymentName);
            DeploymentV1 updatedDeployment = await UpdateDeployment (client, currentDeployment, numberOfPods);
        }

        return ExitCodes.Success;
    } catch (Exception unexpectedError) {
        log.LogError ("[ERROR] K8s unable to scale deployment: " + unexpectedError.ToString ());
        return ExitCodes.UnexpectedError;
    }
}

static async Task<DeploymentV1> UpdateDeployment (IKubeApiClient client, DeploymentV1 existingDeployment, int numberOfPods) {
    DeploymentV1 updatedDeployment = await client.DeploymentsV1 ().Update (existingDeployment.Metadata.Name, kubeNamespace : existingDeployment.Metadata.Namespace, patchAction : patch => {
        patch.Replace (
            path: deployment => deployment.Spec.Replicas,
            value: numberOfPods
        );
    });

    updatedDeployment = await client.DeploymentsV1 ().Get (updatedDeployment.Metadata.Name, updatedDeployment.Metadata.Namespace);
    return updatedDeployment;
}

El resultado se puede ver aquí:

Implementando órdenes a OpenKore vía Azure Storage Queues

Igual que pasaba con la API de Kubernetes, necesitábamos alguna forma de trasladar los comandos recibidos por Alexa a OpenKore, el cual tiene una interfaz de plugins muy potente que facilita enormemente el desarrollo de funcionalidad para los bots.

Como OpenKore está desarrollado con Perl, el plugin tiene que estar hecho bajo el mismo lenguaje. Esto lejos de ser un impedimento, resultó ser una experiencia más agradable de lo esperado, dado la riqueza de módulos CPAN, que hacen que Perl puede hacer prácticamente de todo.

La primera aproximación fue crear un pequeño servicio web REST que escuchase las peticiones que le llegasen de Alexa, pero por la propia arquitectura de OpenKore, los plugins se ejecutan de forma síncrona y por tanto la implementación de la escucha de un servicio REST me bloqueaba la ejecución del bot. Antes de lanzarme a un apasionante mundo de programación asíncrona en Perl, se me ocurrió una aproximación más sencilla: Azure Storage Queues.

Perl es un lenguaje potente y estupendo para consumir servicios web y... recordemos que en Azure podemos operar con todos sus servicios a través de la API REST de Azure Resource Manager, así que... ¡ya tenía forma de proceder seleccionada!

Desarrollando con Perl el plugin de Azure Storage Queues para OpenKore

Azure Storage provee de bibliotecas para trabajar con distintos lenguajes de desarrollo... pero Perl no se encuentra entre ellos. No obstante fue fácil desarrollar una pequeña biblioteca que me diera las funciones que necesitaba. Está publicada aquí, y hace meses publiqué un artículo hablando de ello.

Armado con dicha biblioteca hice lo siguiente:

  • Un pequeño script que genera tantas Azure Storage Queues como necesite. En mi caso una para cada bot.
  • El plugin de OpenKore que cada bot tiene incorporado. El plugin recibe mendiante variable de entorno (recordad que nos ejecutamos mediante Docker o Kubernetes) los datos de la Storage Account y la cola a conectarse.

El siguiente diagrama lo expone gráficamente:

alexaazure2

El núcleo del código es el siguiente:

package azureCognitive;

use strict;
use Plugins;
use Actor;
use Log qw(message);
use Digest::SHA qw(hmac_sha256_hex);
use REST::Client;
use lib $Plugins::current_plugin_folder;
use Azure::StorageQueue;

my %azureStorageQueue = (
    'AccountName' => $ENV{'STORAGE_ACCOUNT_NAME'},
    'QueueName'   => $ENV{'STORAGE_QUEUE_NAME'},
    'ApiVersion'  => '2018-03-28',
    'AccountKey'  => $ENV{'STORAGE_ACCOUNT_KEY'},
);
my $lastStorageQueueCallEpoch = time();

my $client = REST::Client->new();
Plugins::register("azurecognitive", "Azure Cognitive Plugin", \&on_unload, \&on_reload);
my $hooks = Plugins::addHooks(
    ['packet_pubMsg', \&inbound_pubMsg],
    ['mainLoop_post', \&AI_post],
);

sub on_unload {
    message("\nAzure Cognitive Plugin is unloading...\n\n");
	Plugins::delHooks($hooks);
    undef $hooks;
}

sub on_reload {
	&on_unload;
}

sub AI_post {
    if ((time() - $lastStorageQueueCallEpoch) > 0) {
        my @messagesArray = Azure::StorageQueue::Get_AzureStorageQueueMessages(\%azureStorageQueue, $client, 1, 0);
        if (scalar @messagesArray > 0)
        {
            foreach (@messagesArray) {
                my %message = %$_;
                message("[azurecognitive] Running command $message{'MessageText'}\n");
                Commands::run($message{'MessageText'});
                my $deleteResult = Azure::StorageQueue::Delete_AzureStorageQueueMessage(\%azureStorageQueue, $client, $message{'MessageId'}, $message{'PopReceipt'});
                if ($deleteResult) {
                    message("[azurecognitive] Message $message{'PopReceipt'} WAS NOT deleted\nResponse message: $deleteResult\n");
                } else {
                    message("[azurecognitive] Message $message{'PopReceipt'} was successfully deleted\n");
                }
            }
        } else {
            message(".");
        }
    }
    $lastStorageQueueCallEpoch = time();
}

Este plugin hace lo siguiente:

  1. Recibe los datos de la Storage Account mediante variable de entorno.
  2. Registra el gancho de mainLoop_post, donde la función AI_post será llamada continuamente.
  3. Dentro de la función AI_post forzamos a que pase un mínimo de tiempo entre una llamada y otra.
  4. Consultamos la Azure Storage Queue mediante el método Azure::StorageQueue::Get_AzureStorageQueueMessages() y procesamos todos los mensajes que haya en ella.
  5. Mandamos al bot ejecutar el comando que hemos retirado mediante Commands::run($message{'MessageText'});.
  6. Eliminamos de la cola el mensaje que hemos procesado mediante Azure::StorageQueue::Delete_AzureStorageQueueMessage().

Con OpenKore listo para escuchar los comandos que algo dejase en las colas y Alexa escuchando la voz del usuario, sólo nos faltaba unir ambos elementos, ¡así que volvemos a Azure Functions!

Escribiendo en las colas de Azure Storage mediante Azure Functions

Como se puede presuponer, desde C# resultó ser bastante más sencillo por tenter bibliotecas y SDKs para hacer las llamadas necesarias. Podemos ver en el código que con el plugin construido y funcionando es sencillo enviar órdenes a todos los bots insertando mensajes en sus respectivas colas.

Veamos por ejemplo como procesamos la orden de reunirse en un punto del mapa. En primer lugar tenemos el case del Intent correspondiente:

case "ReunitedIntent":
    log.LogInformation("[INFO] ReunitedIntent");
    numberResult = IssueCommandToOnlineBots("move 290 185", dbConnStr, queueConnStr, log);
    response = ResponseBuilder.Tell("He ordenado a los bots moverse al punto de reunión");
    response.Response.ShouldEndSession = true;
    break;

Esto nos lleva al método IssueCommandToOnlineBots(), que tiene el siguiente aspecto:

public static int IssueCommandToOnlineBots (string command, string dbConnStr, string queueConnStr, ILogger log) {
    try {
        List<string> onlineBotList = GetRathenaOnlineChars (dbConnStr, log, "botijo");
        log.LogInformation ("[INFO] Attempting to issue command to " + onlineBotList.Count.ToString () + " bots.");
        InsertAllAzureStorageQueuesMessage (queueConnStr, onlineBotList, command, log);
        return onlineBotList.Count;
    } catch (Exception ex) {
        log.LogError ("[ERROR] " + ex.Message);
        return -1;
    }
}

Como tenemos aproximadamente 5.000 bots, no vamos a mandar la orden indiscriminadamente a todos sino sólo a los que estén conectados. Obtenemos esa información y es cuando InsertAllAzureStorageQueuesMessage() hace la inserción en la lista que le facilitemos.

public static async void InsertAllAzureStorageQueuesMessage (string queueConnStr, List<string> queueNames, string message, ILogger log) {
    await queueNames.ParallelForEachAsync (async currentQueue => {
        await InsertAzureStorageQueueMessage (queueConnStr, currentQueue, message, log);
    }, maxDegreeOfParalellism : 100);
}

public static async Task InsertAzureStorageQueueMessage (string queueConnStr, string queueName, string message, ILogger log) {
    CloudStorageAccount storageAccount = CloudStorageAccount.Parse (queueConnStr);
    CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient ();
    CloudQueue queue = queueClient.GetQueueReference (queueName);
    string base64message = Base64Encode (message);
    try {
        await queue.AddMessageAsync (new CloudQueueMessage (base64message, true));
    } catch (Exception ex) {
        log.LogError ("[ERROR]: " + ex.Message);
    }
}

Para que el rendimiento sea aceptable en la demo y lleguemos a los 300 bots en poco tiempo, realizamos llamadas asíncronas en paralelo para cada cola mediante ParallelForEachAsync() que implementa System.Collections.Async.

Con todas las piezas unidas, ¡ya estamos listos para funcionar! ¿El resultado? Tan gráfico como lo siguiente:

Conclusiones

Tecnologías tan dispares como Alexa, Perl, C#, OpenKore, Kubernetes y Azure pueden convivir en armonía gracias a todos los estándares y mecanismos de interoperabilidad que tenemos a día de hoy. Además, hemos podido comprobar como la potencia de Kubernetes y su desarrollo orientado a API permita que las propias aplicaciones interactúen con las características propias de un datacenter dentro del cluster, realizando operaciones como podría ser el escalado.

Agradecimientos

  • A Alberto Marcos, mi compañero de viaje en esta aventura.
  • A Álvaro Marcos, que nos ayudó con las lladas a Kubernetes desde C#.
  • A Beatriz Sebastián, que me ayudó con un pequeño script de SQL de mySQL que sincronizaba entre tablas el estado online de los usuarios.

Happy Alexa-Azure-Kubernetes-Perl!