AKKA

Real Life Akka

László Zoltán • 2019. október 17.

 

Számos kulcsfontosságú dolgot érdemes figyelembe venni, ha akka-val kezdünk dolgozni. Az alább összegyűjtött példák/tanácsok segítenek elindulni a helyes irányba, hogy az akka alapú szoftverek éles üzemben is megállják a helyüket.

Kerüljük a megosztott változtatható állapotot

Egyszerűen fogalmazva, ne csináljunk olyan üzenettípusokat, amelyek mezői megváltoztathatóak. Legyen minden mező “final” és legyünk különös tekintettel a Collectionökre. Ha egy üzenetben a Collections framework valamely adatszerkezetét szeretnénk tárolni, akkor gondoskodjunk róla, hogy az üzenet létrehozásakor ne az eredeti példányt, hanem annak egy immutable másolatát tároljunk el.

Hogy miért van erre szükség? Aktorok közötti kommunikáció során, ha az egyik aktor fenntart egy referenciát a másik aktor számára küldött üzenetre, majd ennek az üzenetnek a tartalmát módosítja a másik aktor, akkor nagy valószínűséggel olyan helyzet alakul ki, amit nem kívántunk. Mivel az aktorok párhuzamosan dolgoznak, ezért ebben az esetben mindenképp gondoskodnunk kellene az üzenet szálbiztos adatmódosításról, de ezt felejtsük is el. (Ha Scalat használunk, akkor az immutabilitást a case class-ok biztosítják.)

Ha valahol synchronized blokkokat/metódusokat kellene használni, vagy valamilyen egyéb konkurenciakezelést kellene használni (Mutex, Semaphore, ReadWriteLock, szálbiztos adatszerkezetek, stb), akkor rossz az irány! Akka környezetben ilyesmire nincs szükség. Az aktor modell egyik fő célja, az ilyen eszközök használatának mellőzése.

Rossz request
public class AddPhoneNumbersRequest {

    private String username;
    private List<String> phoneNumbers;

    public String getUsername() {
        return username;
    }

    public AddPhoneNumbersRequest setUsername(String username) {
        this.username = username;
        return this;
    }

    public List<String> getPhoneNumbers() {
        return phoneNumbers;
    }

    public AddPhoneNumbersRequest setPhoneNumbers(List<String> phoneNumbers) {
        this.phoneNumbers = phoneNumbers;
        return this;
    }
}
Helyes request
public class AddPhoneNumbersRequest {

    private final String username;
    private final List<String> phoneNumbers;

    public AddPhoneNumbersRequest(String username, List<String> phoneNumbers) {
        this.username = username;
        this.phoneNumbers = Optional.ofNullable(phoneNumbers).map(List::copyOf).orElseGet(Collections::emptyList);
    }

    public String getUsername() {
        return username;
    }

    public List<String> getPhoneNumbers() {
        return phoneNumbers;
    }
}

Mi a helyzet az aktor belső állapotával? Az a legjobb, ha az aktor stateless és akkor erre nincs is gondunk. Ugyanakkor ha az aktorunk stateful, sose módosítsuk az állapotát callback metódusból. Az állapotmódosítást minden esetben egy beérkező üzenet feldolgozása során kell elintéznünk a feldolgozást végző szálon!

Rossz megoldás
public class UserEmailActor extends AbstractActor {

    private final Map<String, String> userEmailAddressCache = new HashMap<>();
    private final UserRepository userRepository;

    public UserEmailActor(UserRepository userRepository) {
        this.userRepository = userRepository;
    }

    public static Props props(UserRepository userRepository) {
        return Props.create(UserEmailActor.class, () -> new UserEmailActor(userRepository));
    }

    private void getEmailOfUser(String username) {
        if (!userEmailAddressCache.containsKey(username)) {
            ActorRef self = getSelf();
            ActorRef sender = getSender();

            CompletableFuture<User> userFuture = userRepository.findOneByUsername(username);
            userFuture
                .thenApply(User::getEmail)
                .whenComplete((userEmail, failure) -> {
                    if (failure == null) {
                        userEmailAddressCache.put(username, userEmail);
                        sender.tell(userEmail, self);
                    } else {
                        sender.tell(new Failure(failure), self);
                    }
                });
        } else {
            getSender().tell(userEmailAddressCache.get(username), getSelf());
        }
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
            .match(String.class, this::getEmailOfUser)
            .build();
    }
}

A whenComplete belsejében végzett map módosítás aszinkron hívódik, ezért ez problémákat okozhat.

Ask vagy tell?

Alapvetően az aktorok közötti kommunikáció fire&forget alapon működik. Küldünk egy üzenetet az aktornak és várjuk a “csodát”. Hogy hozható össze ez a koncepció a request-response világgal? Erre ad megoldást az ask pattern.

Alap koncepció ask nélkül

Hogy is néz ez ki? A kérdező küld egy üzenetet egy másik aktornak. Mivel minden üzenetküldésnél meg kell adnunk, hogy ki a feladó, ezért az aktor, akinek az üzenetet küldtük tudja, hogy kinek kell válaszolni (a feladónak). Így egy kérdezz-felelek kör úgy néz ki hogy A aktor szól B aktornak, hogy szüksége volna valamilyen adatra , majd B az igénybejelentésre válaszolva elküldi a kért adatokat.

public class A extends AbstractActor {

    private static final Logger LOGGER = LoggerFactory.getLogger(A.class);

    private final ActorRef b = getContext().actorOf(B.props());

    public static Props props() {
        return Props.create(A.class);
    }

    private void askB() {
        b.tell("Who are you?", getSelf());
     }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
            .matchEquals("Init", ignore -> askB())
            .match(String.class, stringResponse -> LOGGER.debug("Response: [{}]", stringResponse))
            .build();
    }

}

public class B extends AbstractActor {

    public static Props props() {
        return Props.create(B.class);
    }

    private void respond() {
        getSender().tell("I am Groot!", getSelf());
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
            .matchAny(ignore -> respond())
            .build();
    }

}

                                    
Kommunikációs minták

A fent leírt kommunikáció még igencsak egyszerű, de vannak összetettebb esetek is. Nézzük ezeket!

 

Az ábrán látható A, B, C, D jelölésű elemek aktorokat reprezentálnak.

A pirossal jelölt első példa a legtöbb esetben egy hibás megoldást jelent. Mi vele a probléma? Aktorok közötti kommunikáció során törekednünk kell arra, hogy fölöslegesen köztes állomásokat ne iktassunk be. Kiemelném, hogy ez nem mindig fölösleges! Abban az esetben ha ez a köztes B aktor valamilyen filterszerű szerepet tölt be, akkor kénytelenek vagyunk így kommunikálni. (Pipes and Filters pattern)

Ha nincs szükségünk arra, hogy B aktor kétirányú filterszerű szerepet töltsön be, vagy a kommunikációt elegendő csak az egyik irányban megszakítani, akkor B-nek csak továbbítania kell az üzenetet C-nek (ActorRef forward metódusával). Mivel B csak továbbította az üzenetet, ezért C úgy látja, hogy az üzenet feladója A. Amikor C válaszol a feladónak, akkor egyből A-nak válaszol. Ez látható a második ábrán.

De mi van ha nem érkezik válasz? (Erre jó lenne valami időkorlát.) Mi van akkor ha nem 2-3 aktor között folyik ez a kommunikáció, hanem 4, vagy több? A harmadik ábrán látható az Akka igazi ereje. Ha tudjuk hogy két művelet párhuzamosan is végrehajtható, akkor ezek a műveletek elkülöníthetőek két külön aktorba, C és D aktorokba. C és D aktoroknak B küld üzeneteket (fork), majd “bevárja” az aktorok válaszait (join), C és D válasza alapján állítja össze saját válaszüzenetét A aktor számára.

Az ask minta

Az ask pattern egy Future, vagy CompletionStage objektumot ad vissza, amely akkor válik completed státuszúvá, amikor a kérdezett aktor válaszolt. Teszi ezt úgy hogy létrehoz egy köztes aktort, amely elküldi a kérést, majd várja a választ. Ha érkezik válasz, akkor beállítja a Future megfelelő státuszát.

Ennek az az előnye, hogy ha több aktor válaszüzenete alapján lehetséges csak összeállítani egy aktor válaszát, akkor a Future-ök használata ezt könnyebbé és átláthatóbbá teszi.

Lássuk hogy a korábbi példa hogy néz ki az ask használatával:

 

public class B extends AbstractActor {

    private final ActorRef actorC = getContext().actorOf(C.props());
    private final ActorRef actorD = getContext().actorOf(D.props());

    public static Props props() {
        return Props.create(B.class);
    }

    private void respond() {

        CompletionStage<String> responseOfC = Patterns.ask(actorC, "ask", Duration.ofSeconds(10))
            .thenApply(String.class::cast);
        CompletionStage<String> responseOfD = Patterns.ask(actorD, "ask", Duration.ofSeconds(10))
            .thenApply(String.class::cast);

        Patterns.pipe(
            responseOfC.thenCombine(responseOfD, (c, d) -> c + ' ' + d),
            getContext().getDispatcher()
        ).to(getSender(), getSelf());
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
            .matchEquals("ask", ignore -> respond())
            .build();
    }

}

A fent látható B aktor két kérdést indít, egyet C-nek, egyet D-nek. A kapott CompletionStage-ek eredményét kombinálva állítja elő a választ A aktor számára. Az így előállított új választ a pipe pattern segítségével továbbítja.

Az ask másik előnye, hogy minden esetben meg kell adnunk a számára egy időlimitet, amely időlimit alatt a válasznak meg kell érkeznie. Ha nem érkezik meg, akkor a CompletionStage időtúllépsi hibával zárul, amit a megfelelő callback előállításával könnyen tudunk kezelni. Az alap koncepció szerinti példában az aktorok közvetlenül kommunikáltak egymással, de ezekben az esetekben nem tudjuk kezelni azt, amikor nem érkezik válasz, ezért a legtöbb szituációban mindig célszerű az askot használni a közvetlen kommunikáció helyett.

A pipe minta

A pipe egy egyszerűsített megoldás arra, hogy egy CompletionStage eredményét (legyen az sikeres vagy hibás) továbbítsa.

Figyelem! Ha a pipe számára nem adjuk meg hogy ki a feladó (sender), akkor az az aktor, amelyik fogadja a pipe által küldött üzenetet, nem tud válaszolni arra, hiszen nem lesz feladója az üzenetnek.

A pipe minta használatával egyenértékű megoldás az alábbi lenne:

ActorRef self = getSelf();
ActorRef sender = getSender();
responseOfC.thenCombine(responseOfD, (c, d) -> c + ' ' + d)
    .whenComplete((result, failure) -> sender.tell(failure == null ? result : new Failure(failure), self));

Figyelem! Nem mindegy, hogy hol hívjuk meg a getSelf()/self(), és getSender()/sender() metódusokat. Ezeket az értékeket az akka.actor.ActorContext tárolja. Az aktor kontextus csak egy aktoron belül valid, ami azt jelenti hogy az adott aktoron kívül (pl. más aktorból), vagy más szálról ne próbáljunk meg hozzáférni.

A fenti példa helytelen verziója

responseOfC.thenCombine(responseOfD, (c, d) -> c + ' ' + d)
    .whenComplete((result, failure) -> getSender().tell(failure == null ? result : new Failure(failure), getSelf()));

Az alapvető problémát itt is callback jelenti. Mivel a whenComplete aszinkron módon hívódik, ezért lehetséges hogy mire a végrehajtás oda érne, hogy lekérdezze az üzenet feladóját, addigra az aktor már elkezdte feldolgozni egy másik aktor üzenetét. Így gyakran előállhat az az eset, hogy nem a megfelelő aktornak kerül kiküldésre a válaszüzenet.

Hatékony ask használat

Ahogy láthatjuk, ha vigyázunk az állapothatárokra, akkor az ask és a pipe használatával kényelmesen bonyolíthatjuk az aktorok közötti kommunikációt, de ez nincs ingyen! Mivel az ask használatához egy új köztes aktor létrehozására van szükség, ezért számolnunk kell ennek a többletköltségével. Szinte elhanyagolható ez a többletköltség, de a sok kicsi sokra megy, és könnyű túlzásba vinni az ask használatát.

Vegyük elő ismét a kommunikációs minták első két példáját, ezúttal az ask használatával. Tegyük fel, hogy A aktor kérdez B-től, aki a válaszadáshoz megkérdezi C-t, aki ahhoz hogy válaszolni tudjon, szintén kérdez D irányába. D válaszol C-nek, C B-nek, majd B A-nak.

Azzal nincs probléma, ha a szoftver struktúrája nem teszi lehetővé, hogy A aktor közvetlenül a D-től kérdezzen. Azzal sincs probléma, hogy ha B és C aktorok nélkülözhetetlenek, mert valamilyen átalakítást/kiegészítést végeznek, ahhoz hogy D aktorhoz a megfelelő request jusson el.

Problémát az jelent, hogy B és C, illetve C és D aktorok között fölöleges az ask használata. (Első ábra) Helyette az A aktor az ask használatával kérdezzen B-től, B végezze el a szükséges lépéseket, majd továbbítsa a megfelelő üzenetet C számára. C aktor tegyen ugyanígy. (Második ábra) Mivel az üzenetet az aktorok csak továbbították, ezért D aktor úgy fogja látni hogy a feladó az A aktor, így neki is válaszol.

Kódszervezés
Üzenettípusok

Célszerű az aktorok osztályában definiálni minden olyan üzenettípust, amelyet az adott aktor képes fogadni, valamint küldeni. Ez alól kivételt jelent az az eset, ha az egyik aktor képes fogadni a másik aktor válaszüzenetét. Pl.: A aktor képes fogadni B válaszát. Ekkor B-ben célszerű definiálni a válaszüzenet típusát.

Ha vannak olyan üzenettípusok, amelyek hasonló adattartalommal bírnak, akkor ez a módszer könnyen kódduplikációhoz vezet. Ennek megoldására az aktorokon kívül érdemes közös ősosztályt létrehozni, és az aktorspecifikus üzeneteket továbbra is az aktorok osztályában tartani. A specializált üzenettípusok pedig származhatnak a közös ősből.

Ettől függetlenül, ha vannak üzenetek, amelyeket több aktor is képes használni kommunikációja során, akkor ezeket lehet külön kezelni az aktroktól.

Üzenetfogadás és skálázás

Hányféle üzenetet fogadhat egy aktor? Technikailag akárhányat, de ez megfontolandó!

Tegyük fel hogy van egy CryptographyActor, amely két üzenettípust képes fogadni: EncryptionRequest és DecryptionRequest (amelyek mindegyikéhez tartozik egy-egy Response üzenettípus is). Az aktor funkcióját tekintve adat kódolással, illetve dekódolással foglalkozik, és az aktor által használt kriptográfiai eljárás sajátossága, hogy a dekódolás több időt vesz igénybe, mint a kódolás. Nem tűnik logikátlannak, hogy ezt a két, egyébként összetartozó műveletet egy aktorban kezeljük, de ez mégsem a legjobb megoldás. A gondot az jelenti, hogy a dekódolás több időt vesz igénybe, ezért feltarthatja a kódolási kéréseket, amelyek egyébként gyorsabban is teljesíthetőek lennének. Adódik hát, hogy ezt a két műveletet két külön aktorban (EncryptionActor és DecryptionActor) valósítsuk meg, így nem lesznek egymásra közvetlenül hatással, és külön szabályozhatjuk a műveletek számára biztosított erőforrásokat.

Más a helyzet akkor, ha egy aktor csak továbbítja az üzeneteket. Folytatván az előző példát, van egy EncryptionActor és egy DecryptionActor osztályunk. Módosítsuk úgy a korábbi CryptographyActor osztályunkat, hogy az csak továbbítsa a kódolási és dekódolási kéréseket, a megfelelő aktorok felé. Így megőriztük a korábbi struktúrát. Mivel az üzenet továbbítás nem erőforrásigényes és minden üzenetre nézve azonos időn belül teljesíthető, ezért az ilyen diszpécser aktorok, mint a CryptographyActor, nem jelentenek szűk keresztmetszetet.

Érdemes az EncryptionActor és DecryptionActor osztályokat a CryptographyActor gyermekeivé tenni, elkerülendő a lapos aktor hierarchiát.

Ha Springes vagy JavaEE-s megközelítésből szemléljük a dolgot, akkor jó kiindulási alap, ha az aktorokat oly módon szervezzük, hogy egy Bean publikus metódusához egy actort készítünk, illetve magához a Beanhez is egyet, amely tartalmazza a publikus metódusok funkcióját megvalósító aktorokat. Ekkor a Beant reprezentáló aktor továbbíthatja a kéréseket a megfelelő Bean metódust megvalósító aktor felé.

Blokkoló hívások, módszerek

A legveszélyesebb hiba amit el lehet követni! Ha non-blocking rendszert építünk, akkor ne blokkoljuk azt le! Hogy lehet ilyesmit elkövetni? Pl. valamilyen 3rd party API/lib használatával, legacy rendszerekhez való illeszkedéssel, Future eredmények kényszerű bevárásával, Thred.sleep()-pel.

Ha mégis ilyesmire kényszerülnénk, akkor az ilyen megoldásokat jól válasszuk le a rendszerünk többi, non-blocking részétől. Tegyünk a blokkoló hívások köré egy-egy külön aktort, valamint célszerű ezek számára külön, dedikált Dispatchert (Thread poolt) fenntartani. Vegyük figyelembe, hogy ezek az aktorok az üzenetfeldolgozás során hosszabb ideig foglalnak egy szálat. Ha ezek az aktorok ugyanazokat a szálakat használják mint a rendszer többi (non-blocking) része, akkor azzal belassíthatjuk azt is, ami egyébként gyors lenne, mivel a blokkoló üzenetfeldolgozás foglalja az erőforrásokat (szálakat), amelyeket a többi aktor is használna.

Thread.sleep() legtöbbször azért jön elő, hogy adott időközönként ellenőrizzünk, vagy futtassunk valamit. Akka környezetben erre egyértelműen nem ezt a megoldást kell választanunk. Használjuk az Akka ütemezőjét, aminek segítségével adott időközönként, vagy időpontokban küldhetünk üzenetet aktoroknak. Logikusan az ütemezett üzenet feldolgozása során kell elvégeznünk azt az ellenőrzést, vagy feladatot, amelyet egyébként a várakozást követően tennénk meg.

Aszinkron hívások eredményét ne várjuk be, használjuk helyette a különböző callback lehetőségeket, amelyeket az API biztosít!

Használjuk a pipe mintát! Az alábbi példában A aktor kérdez valamit B-től. B ahhoz hogy válaszolni tudjon, C-től kérdez.

A rossz megoldás
public class A extends AbstractActor {

    private static final Logger LOGGER = LoggerFactory.getLogger(A.class);

    private final ActorRef b = getContext().actorOf(B.props());

    public static Props props() {
        return Props.create(A.class);
    }

    private void respond() {
        Patterns.ask(b, "ask", Duration.ofSeconds(10))
            .thenApply(String.class::cast)
            .whenComplete((response, failure) -> LOGGER.debug("Response: [{}], Failure: [{}]", response, failure));
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
            .matchAny(ignore -> respond())
            .build();
    }

}

public class B extends AbstractActor {

    private final ActorRef c = getContext().actorOf(C.props());

    public static Props props() {
        return Props.create(B.class);
    }

    private void respond() {
        String response = Patterns.ask(c, "ask", Duration.ofSeconds(10))
            .thenApply(String.class::cast)
            .toCompletableFuture()
            .join();

        getSender().tell(response, getSelf());
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
            .matchEquals("ask", ignore -> respond())
            .build();
    }

}

public class C extends AbstractActor {

    private void respond() {
        getSender().tell("C", getSelf());
    }

    public static Props props() {
        return Props.create(C.class);
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
            .matchEquals("ask", ignore -> respond())
            .build();
    }
}

Ez azért helytelen megoldás, mert B aktor üzenetfeldolgozása blokkolva van, amíg C válasza meg nem érkezik. Helyette használjuk a pipe mintát, ami lehetővéteszi, hogy C válasza továbbításra kerüljön A számára. Ehhez csak a B aktort kell módosítanunk az alábbi módon:

A helyes megoldás
public class B extends AbstractActor {

    private final ActorRef c = getContext().actorOf(C.props());

    public static Props props() {
        return Props.create(B.class);
    }

    private void respond() {
        Patterns.pipe(
            Patterns.ask(c, "ask", Duration.ofSeconds(10)),
            getContext().getDispatcher()
        ).pipeTo(getSender(), getSelf());
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
            .matchEquals("ask", ignore -> respond())
            .build();
    }

}

Több actor system?

Óvatosan! Mivel minden actorsystem létrehoz magának egy dispatchert (thread poolt) ezért, ha több actorsystemet inicializálunk, akkor a létrehozott threadek száma is ezzel együtt nő. Ez nem várt performancia problémákhoz vezethet. Egyébként is fölösleges két actorsystemet létrehozni, ha egy is elég minden problémánk megoldásához. Ha eredetileg is az volt a cél hogy a két külön actorsystem külön szálakból dolgozzon, akkor helyette inkább használjunk külön dispatchert azokhoz az aktorokhoz, amelyeket egyébként a külön actorsystem alatt kezelnénk.

Szinkron naplózás

Problémás eset lehet, ha a naplózást szinkron módon valósítjuk meg.
Jellemzően minden naplózás valami I/O művelettel végződik. (pl.: fájlba írás, adatbázisba írás, vagy egyéb hálózati kommunikáción való adattovábbítás) Ezek nyilván lassabban végrehajtható műveletek, ezért ha szinkron módon történik, akkor az komolyan belassíthatja az aktorok egyébként gyors működését.

Megoldásként vagy használjuk az Akka által biztosított naplózási módszert, vagy ha egyéb eszközt használunk (pl.: SLF4J-Logback), akkor győződjünk meg róla, hogy aszinkron appendert használ-e a naplóbejegyzések perzisztálására.

Hibakezelés

Az Akka az aktorokat hierarchikusan kezeli. Minden, a fejlesztő által létrehozott aktor a User aktor gyermeke és minden ilyen aktornak lehet több gyermeke, amelyeknek szintúgy. Ennek a hibakezelés szempontjából nagy jelentősége van. Ha egy aktor hibát dob az üzenetfeldolgozás során, akkor a szülőaktor felelős a hibakezelésért. A hibakezelés során lehetséges (stratégiától függően), hogy a hibás aktor újraindításra kerül, ami alapvetően azt is jelenti, hogy tranzitívan minden gyermeke újraindításra kerül, ezzel garantálva, hogy egy friss állapotot érhessen el az adott aktor.

Emiatt elkerülendő az a stratégia, hogy mindig csak “top-level” (User aktor alá tartozó) aktrokat hozzunk létre. Érdemes tehát egy aktor aktorfüggőségeit a gyermekeiként kezelni.