Software Integration
Seamless communication — that, among other consequential advantages, is the ultimate goal when integrating your software. And today, integrating modern software means fusing various applications and/or systems — many times across distributed environments — with the common goal of unifying isolated data. This effort often signifies the transition of legacy applications to cloud-based systems and messaging infrastructure via microservices and REST APIs.So what's next? Where is the path to seamless communication and nuanced architecture taking us? Dive into our 2023 Software Integration Trend Report and fill the gaps among modern integration practices by exploring trends in APIs, microservices, and cloud-based systems and migrations. You have to integrate to innovate!
Distributed SQL Essentials
Advanced Cloud Security
With various access control models and implementation methods available, constructing an authorization system for backend service APIs can still be challenging. However, the ultimate goal is to ensure that the correct individual has appropriate access to the relevant resource. In this article, we will discuss how to enable the Role-based access control (RBAC) authorization model for your API with open-source API Gateway Apache APISIX and Open Policy Agent (OPA). What Is RBAC? Role-based access control (RBAC)and attribute-based access control (ABAC) are two commonly used access control models used to manage permissions and control access to resources in computer systems. RBAC assigns permissions to users based on their role within an organization. In RBAC, roles are defined based on the functions or responsibilities of users, and permissions are assigned to those roles. Users are then assigned to one or more roles, and they inherit the permissions associated with those roles. In the API context, for example, a developer role might have permission to create and update API resources, while an end-user role might only have permission to read or execute API resources. Basically, RBAC assigns permissions based on user roles, while ABAC assigns permissions based on attributes associated with users and resources. In RBAC, a policy is defined by the combination of a user’s assigned role, the actions they are authorized to perform, and the resources on which they can perform those actions. What Is OPA? OPA (Open Policy Agent) is a policy engine and a set of tools that provide a unified approach to policy enforcement across an entire distributed system. It allows you to define, manage, and enforce policies centrally from a single point. By defining policies as code, OPA enables easy review, editing, and roll-back of policies, facilitating efficient policy management. OPA provides a declarative language called Rego, which allows you to create and enforce policies throughout your stack. When you request a policy decision from OPA, it uses the rules and data that you have provided in a .rego file to evaluate the query and produce a response. The query result is then sent back to you as the policy decision. OPA stores all the policies and the data it needs in its in-memory cache. As a result, OPA returns results quickly. Here is an example of a simple OPA Rego file: package example default allow = false allow { input.method == "GET" input.path =="/api/resource" input.user.role == "admin" } In this example, we have a package called “example” that defines a rule called “allow”. The “allow” rule specifies that the request is allowed if the input method is “GET”, the requested path is /api/resource, and the user's role is "admin". If these conditions are met, then the "allow" rule will evaluate as "true", allowing the request to proceed. Why Use OPA and API Gateway for RBAC? API Gateway provides a centralized location to configure and manage API, and API consumers. It can be used as a centralized authentication gateway by avoiding having each individual service implement authentication logic inside the service itself. On the other hand, OPA adds an authorization layer and decouples the policy from the code by creating a distinct benefit for authorization. With this combination, you can add permissions for an API resource to a role. Users might be associated with one or more user roles. Each user role defines a set of permissions (GET, PUT, DELETE) on RBAC resources (defined by URI paths). In the next section, let’s learn how to achieve RBAC using these two. How to Implement RBAC With OPA and Apache APISIX In Apache APISIX, you can configure routes and plugins to define the behavior of your API. You can use the APISIX opa plugin to enforce RBAC policies by forwarding requests to OPA for decision-making. Then OPA makes an authorization decision based on users’ roles and permissions in real-time. Assume that we have Conference API where you can retrieve/edit event sessions, topics, and speaker information. A speaker can only read their own sessions and topics while the admin can add/edit more sessions and topics. Or attendees can leave their feedback about the speaker’s session via a POST request to /speaker/speakerId/session/feedback and the speaker can only see by requesting the GET method of the same URI. The below diagram illustrates the whole scenario: API consumer requests a route on the API Gateway with its credential such as a JWT token in the authorization header. API Gateway sends consumer data with a JWT header to the OPA engine. OPA evaluates if the consumer has a right to access the resource by using policies (roles and permissions) we specify in the .rego file. If the OPA decision is allowed, then the request will be forwarded to the upstream Conference service. Next, we install, configure APISIX, and define policies in OPA. Prerequisites Docker is used to installing the containerized etcd and APISIX. curl is used to send requests to APISIX Admin API. You can also use tools such as Postman to interact with the API. Step 1: Install Apache APISIX APISIX can be easily installed and started with the following quickstart script: curl -sL https://run.api7.ai/apisix/quickstart | sh Step 2: Configure the Backend Service (Upstream) To route requests to the backend service for the Conference API, you’ll need to configure it by adding an upstream server in Apache APISIX via the Admin API. curl http://127.0.0.1:9180/apisix/admin/upstreams/1 -X PUT -d ' { "name":"Conferences API upstream", "desc":"Register Conferences API as the upstream", "type":"roundrobin", "scheme":"https", "nodes":{ "conferenceapi.azurewebsites.net:443":1 } }' Step 3: Create an API Consumer Next, we create a consumer (a new speaker) with the username jack in Apache APISIX. It sets up the jwt-auth plugin for the consumer with the specified key and secret. This will allow the consumer to authenticate using a JSON Web Token (JWT). curl http://127.0.0.1:9180/apisix/admin/consumers -X PUT -d ' { "username": "jack", "plugins": { "jwt-auth": { "key": "user-key", "secret": "my-secret-key" } } }' Step 4: Create a Public Endpoint to Generate a JWT Token You also need to set up a new Route that generates and signs the token using the public-api plugin. In this scenario, API Gateway acts as an identity provider server to create and verify the token with our consumer jack’s key. The identity provider can be also any other third-party services such as Google, Okta, Keycloak, and Ory Hydra. curl http://127.0.0.1:9180/apisix/admin/routes/jas -X PUT -d ' { "uri": "/apisix/plugin/jwt/sign", "plugins": { "public-api": {} } }' Step 5: Claim a New JWT Token for the API Consumer Now we can get a new token for our speaker Jack from the API Gateway using the public endpoint we created. The following curl command generates a new token with Jack’s credentials and assigns role, and permission in the payload. curl -G --data-urlencode 'payload={"role":"speaker","permission":"read"}' http://127.0.0.1:9080/apisix/plugin/jwt/sign?key=user-key -i After you run the above command, you will receive a token as a response. Save this token somewhere — later we are going to use this token to access our new API Gateway endpoint. Step 6: Create a New Plugin Config This step involves configuring APISIX’s 3 plugins: proxy-rewrite, jwt-auth, and opa plugins. curl "http://127.0.0.1:9180/apisix/admin/plugin_configs/1" -X PUT -d ' { "plugins":{ "jwt-auth":{ }, "proxy-rewrite":{ "host":"conferenceapi.azurewebsites.net" } } }' The proxy-rewrite plugin is configured to proxy requests to the conferenceapi.azurewebsites.net host. OPA authentication plugin is configured to use the OPA policy engine running at http://localhost:8181/v1/data/rbacExample. Also, APISIX sends all consumer-related information to OPA. We will add this policy .rego file in the Opa configuration section. Step 7: Create a Route for Conference Sessions The final step is to create a new route for Conferences API speaker sessions: curl "http://127.0.0.1:9180/apisix/admin/routes/1" -X PUT -d ' { "name":"Conferences API speaker sessions route", "desc":"Create a new route in APISIX for the Conferences API speaker sessions", "methods": ["GET", "POST"], "uris": ["/speaker/*/topics","/speaker/*/sessions"], "upstream_id":"1", "plugin_config_id":1 }' The payload contains information about the route, such as its name, description, methods, URIs, upstream ID, and plugin configuration ID. In this case, the route is configured to handle GET and POST requests for two different URIs, /speaker/topics and /speaker/sessions. The "upstream_id" field specifies the ID of the upstream service that will handle incoming requests for this route, while the "plugin_config_id" field specifies the ID of the plugin configuration to be used for this route. Step 8: Test the Setup Without OPA So far, we have set up all the necessary configurations for APISIX to direct incoming requests to Conference API endpoints, only allowing authorized API consumers. Now, each time an API consumer wants to access an endpoint, they must provide a JWT token to retrieve data from the Conference backend service. You can verify this by hitting the endpoint and the domain address we are requesting now is our custom API Gateway but not an actual Conference service: curl -i http://127.0.0.1:9080/speaker/1/topics -H 'Authorization: {API_CONSUMER_TOKEN}' Step 9: Run OPA Service The other two steps are we run the OPA service using Docker and upload our policy definition using its API which can be used to evaluate authorization policies for incoming requests. docker run -d --network=apisix-quickstart-net --name opa -p 8181:8181 openpolicyagent/opa:latest run -s This Docker command runs a container of the OPA image with the latest version. It creates a new container on the existing APISIX network apisix-quickstart-netwith the name opaand exposes port 8181. So, APISIX can send policy check requests to OPA directly using the address [http://opa:8181](http://opa:8181) Note that OPA and APISIX should run in the same docker network. Step 10: Define and Register the Policy The second step on the OPA side is you need to define the policies that will be used to control access to API resources. These policies should define the attributes required for access (which users have which roles) and the permission (which roles have which permissions) that are allowed or denied based on those attributes. For example, in the below configuration, we are saying to OPA, check the user_roles table to find the role for jack. This information is sent by APISIX inside input.consumer.username. Also, we are verifying the consumer’s permission by reading the JWT payload and extracting token.payload.permission from there. The comments describe the steps clearly. curl -X PUT '127.0.0.1:8181/v1/policies/rbacExample' \ -H 'Content-Type: text/plain' \ -d 'package rbacExample # Assigning user rolesuser_roles := { "jack": ["speaker"], "bobur":["admin"] } # Role permission assignments role_permissions := { "speaker": [{"permission": "read"}], "admin": [{"permission": "read"}, {"permission": "write"}] } # Helper JWT Functions bearer_token := t { t := input.request.headers.authorization } # Decode the authorization token to get a role and permission token = {"payload": payload} { [_, payload, _] := io.jwt.decode(bearer_token) } # Logic that implements RBAC default allow = falseallow { # Lookup the list of roles for the user roles := user_roles[input.consumer.username] # For each role in that list r := roles[_] # Lookup the permissions list for role r permissions := role_permissions[r] # For each permission p := permissions[_] # Check if the permission granted to r matches the users request p == {"permission": token.payload.permission} }' Step 11: Update the Existing Plugin Config With the OPA Plugin Once we defined policies on the OPA service, we need to update the existing plugin config for the route to use the OPA plugin. We specify in the policy attribute of the OPA plugin. curl "http://127.0.0.1:9180/apisix/admin/plugin_configs/1" -X PATCH -d ' { "plugins":{ "opa":{ "host":"http://opa:8181", "policy":"rbacExample", "with_consumer":true } } }' Step 12: Test the Setup With OPA Now we can test all setups we did with OPA policies. If you try to run the same curl command to access the API Gateway endpoint, it first checks the JWT token as the authentication process and sends consumer and JWT token data to OPA to verify the role and permission as the authorization process. Any request without a JWT token in place or allowed roles will be denied. curl -i http://127.0.0.1:9080/speaker/1/topics -H 'Authorization: {API_CONSUMER_TOKEN}' Conclusion In this article, we learned how to implement RBAC with OPA and Apache APISIX. We defined a simple custom policy logic to allow/disallow API resource access based on our API consumer’s role and permissions. Also, this tutorial demonstrated how we can extract API consumer-related information in the policy file from the JWT token payload or consumer object sent by APISIX.
Vector databases are currently all the rage in the tech world, and it isn't just hype. Vector search has become ever more critical due to artificial intelligence advances which make use of vector embeddings. These vector embeddings are vector representations of word embeddings, sentences, or documents that provide semantic similarity for semantically close inputs by simply looking at a distance metric between the vectors. The canonical example from word2vec in which the embedding of the word "king" was very near the resulting vector from the vectors of the words "queen", "man", and "woman" when arranged in the following formula: king - man + woman ≈ queen The fact that this works has always seemed amazing to me, but it works even for fairly large documents provided our embedding space is of sufficiently high dimension. With modern deep learning methods, you can get excellent embeddings of complex documents. For TerminusDB we needed a way to leverage these sorts of embeddings for the following tasks that our users have asked for: Full-text search Entity resolution (finding other documents which are likely the same for deduplication) Similarity search (for related content or for recommender systems) Clustering We decided to prototype using OpenAI's embeddings, but in order to get the rest of the features we required a vector database. We needed a few unusual features, including the ability to do incremental indexing, and the ability to index the basis of commits, so we know precisely what commit an index applies to. This allows us to put indexing into our CI workflows. A versioned open-source vector database doesn't exist in the wild. So we wrote one! Writing a Vector Database A vector database is a store of vectors with the ability to compare any two vectors using some metric. The metric can be a lot of different things such as Euclidean distance, Cosine similarity, Taxicab geometry, or really anything that obeys the triangle inequality rules required to define a metric space. In order to make this fast, you need to have some sort of indexing structure to quickly find candidates that are already close for comparison. Otherwise, many operations will need to compare with every single thing in the database, every time. There are many approaches to indexing vector spaces, but we went with the HNSW (Hierarchical Navigable Small World) graph (see Malkov and Yashunin). HNSW is easy to understand and provides good performance in both low and high dimensions, so is flexible. Most importantly there is a very clear open-source implementation that we found - HNSW for Rust Computer Vision. Storing the Vectors Vectors are stored in a domain. This helps to separate different vector stores that do not need to describe the same vectors. For TerminusDB we have many different commits that all pertain to the same vectors, so it's important that we put them all into the same domain. Markdown Page 0 1 2... ——————————————————————— Vectors: | 0 [......] 2 [......] | 1 [......] 3 [......] The vector store is page-based, where each buffer is designed to map cleanly to the operating system pages but fit the vectors we use closely. We assign each vector an index and then we can map from the index to the appropriate page and offset. Inside the HNSW index, we refer to a LoadedVec. This ensures that the page lives in a buffer, currently loaded so we can perform our metric comparisons on the vectors of interest. As soon as the last LoadedVec drops from a buffer, the buffer can be added back into a buffer pool to be used to load a new page. Creating a Versioned Index We build an HNSW structure for each (domain + commit) pair. If starting a new index, we start with an empty HNSW. If starting an incremental index from a previous commit, we load the old HNSW from the previous commit and then begin our indexing operations. What is new versus what is old is all kept in TerminusDB, which knows how to find changes between commits and can submit them to the vector database indexer. The indexer only needs to know the operations it is being asked to perform (i.e., Insert, Delete, Replace). We maintain the indexes themselves in an LRU pool that allows us to load on demand or use a cache if the index is already in memory. Since we only perform destructive operations at commits, this caching is always coherent. When we save the index, we serialize the structure with the raw vector index as a stand-in for the LoadedVec which helps to keep the index small. In the future, we would like to use some of the tricks we have learned in TerminusDB to keep layers of an index around, so new layers can be added without requiring each incremental index to add a duplicate when serializing. However, the indexes have proved small enough compared to the vectors we are storing that it has not mattered much. NOTE: While we currently do incremental indexing, we have yet to implement the delete and replace operations (there are only so many hours in a week!). I've read the literature on HNSW and it seems that it is not yet well described. We have a design for the delete and replace operations that we think will work well with HNSW and wanted to explain in case any technical people have ideas: If we are in an upper layer of the HNSW, then simply ignore the deletion - it should not matter much as most vectors are not in upper layers, and the ones that are, are only for navigation. If we are in the zero layer but not in an above layer, delete the node from the index, while trying to replace links between all neighbors of the deleted link according to closeness. If we are in the zero layer but also above, mark the node as deleted, and use it for navigation but do not store this node in the candidate pool. Finding Embeddings We use OpenAI to define our embeddings, and after an indexing request is made to TerminusDB, we feed each of the documents to OpenAI which returns lists of float vectors in JSON. It turns out that the embeddings are quite sensitive to context. We tried initially just submitting TerminusDB JSON documents and the results were not fantastic. However, we found that if we define a GraphQL query + Handlebars template, we can create very high-quality embeddings. For People in Star Wars, this pair, which is defined in our schema, looks like this: JSON { "embedding": { "query": "query($id: ID){ People(id : $id) { birth_year, created, desc, edited, eye_color, gender, hair_colors, height, homeworld { label }, label, mass, skin_colors, species { label }, url } }", "template": "The person's name is {{label}.{{#if desc} They are described with the following synopsis: {{#each desc} *{{this} {{/each}.{{/if}{{#if gender} Their gender is {{gender}.{{/if}{{#if hair_colors} They have the following hair colours: {{hair_colors}.{{/if}{{#if mass} They have a mass of {{mass}.{{/if}{{#if skin_colors} Their skin colours are {{skin_colors}.{{/if}{{#if species} Their species is {{species.label}.{{/if}{{#if homeworld} Their homeworld is {{homeworld.label}.{{/if}" } } The meaning of each field in the People object is rendered as text which helps OpenAI understand what we mean, providing much better semantics. Ultimately it would be nice if we could guess these sentences from a combination of our schema documentation and the schema structure, which is probably also possible using AI chat! But for now, this works brilliantly and does not require much technical sophistication. Indexing Star Wars So what happens when we actually run this thing? Well, we tried it out on our Star Wars data product to see what would happen. First, we fire off an index request, and our indexer obtains the information from TerminusDB: curl 'localhost:8080/index?commit=o2uq7k1mrun1vp4urktmw55962vlpto&domain=admin/star_wars' This returns with a task-id which we can use to poll an endpoint for completion. The index file and vector files for the domain admin/star_wars and the commit o2uq7k1mrun1vp4urktmw55962vlpto come out as: admin%[email protected] admin%2Fstar_wars.vecs. We can now ask the semantic index server about our documents at the specified commit. curl 'localhost:8080/search?commit=o2uq7k1mrun1vp4urktmw55962vlpto&domain=admin/star_wars' -d "Who are the squid people" We get back a number of results as JSON which look like this: JSON [{"id":"terminusdb:///star-wars/Species/8","distance":0.09396297}, ...] But what is the embedding string we used to produce this result? This is how the text rendered for the Species/8 id: JSON "The species name is Mon Calamari. They have the following hair colours: none. Their skin colours are red, blue, brown, magenta. They speak the Mon Calamarian language." Amazing! Notice that it never says squid anywhere! There is some pretty amazing work being done by our embeddings here. Let's have another go: curl 'localhost:8080/search?commit=o2uq7k1mrun1vp4urktmw55962vlpto&domain=admin/star_wars' -d "Wise old man" JSON "The person's name is Yoda. They are described with the following synopsis: Yoda is a fictional character in the Star Wars franchise created by George Lucas, first appearing in the 1980 film The Empire Strikes Back. In the original films, he trains Luke Skywalker to fight against the Galactic Empire. In the prequel films, he serves as the Grand Master of the Jedi Order and as a high-ranking general of Clone Troopers in the Clone Wars. Following his death in Return of the Jedi at the age of 900, Yoda was the oldest living character in the Star Wars franchise in canon, until the introduction of Maz Kanata in Star Wars: The Force Awakens. Their gender is male. They have the following hair colours: white. They have a mass of 17. Their skin colours are green." Incredible! While we do say "oldest" in the text, we don't say "wise" or "man"! I hope you can see how this could be helpful for you in getting high-quality semantic indexing of your data! Conclusion We have also added endpoints to find neighboring documents and to find duplicates that search the entire corpus. The latter was used on some benchmarks and has performed admirably. We hope to show the results of these experiments here soon. While there are really great vector databases out there in the wild, such as Pinecone, we want to have a sidecar that integrates well with TerminusDB and which can be used for less technical users who care about content primarily and are not going to be spinning up their own vector database. We are really excited about the potential of this VectorLink, and would love people to have a look at what we have so far! Please forgive us a bit for the relatively sparse error handling. We're working furiously on it!
The Competing Consumers pattern is a powerful approach that enables multiple consumer applications to process messages from a shared queue in a parallel and distributed manner. The implementation of this messaging pattern requires the utilization of a point-to-point channel. Senders deliver messages (or work items) to this channel and consumers compete with each other to be the receiver and process the message (i.e., perform some work based on the contents of the message) in an asynchronous and non-blocking manner. How Does It Work? Usually, a message broker (e.g., RabbitMQ, ActiveMQ) or a distributed streaming platform (e.g., Apache Kafka) is needed in order to provide the necessary infrastructure to implement this pattern. Producer applications connect to the broker and push messages to a queue (or a Kafka Topic with multiple partitions implemented as point-to-point channels). Then, consumer applications that are also connected to the broker, pull items from the queue (or belong to the same Kafka Consumer Group and pick up items from a designated partition). Each message is effectively delivered initially to only one available consumer or remains in the queue till a consumer becomes available. Error Handling In case of error during message processing, there are many things to consider that depending on the use case, may complicate things. Many brokers support acknowledgments and transactions while others rely on each microservice or application to implement its logic with its own choice of technology, communication paradigm, and error handling. If a consumer fails to send back an ACK after receiving and processing the message in a timely manner or rolls back the transaction when an exception occurs, the message may automatically (or based on application logic) be requeued or end up in a dead letter queue (DLQ). However, there are always corner cases and scenarios that need attention. For example, if the consumer process has performed some work, and for some reason, "hangs" or there is a network error, then reprocessing the message may not be 100% safe. It depends on how idempotent the consumer processing logic is, and, depending on the use case, a manual inspection may be needed in order to decide if requeueing the message or processing it from a DLQ is ok. This is why we often run into delivery guarantee semantics like "at most once," "at least once," or "exactly once." The latter is the ideal, but in practice is technically impossible. Therefore aiming to achieve effectively once processing (i.e., as close as possible to "exactly once") should be our realistic goal. In case of error messaging on the producer side, or if the message broker is unavailable, things are theoretically a bit less complicated. A recommended pattern to use on this side is the transactional outbox pattern. It involves storing messages in an "outbox" table or collection within the system's database and using a background process to periodically look up and deliver these messages. Only after the broker confirms it received the message, it is considered as sent. Alternatives Based on all the above, the existence of a central server that will act as the messaging broker seems essential. Actually, for production systems, we will need to set up a cluster consisting of multiple broker instances, running on separate VMs to ensure high availability, failover, replication, and so on. If for some reason this is not possible (operational costs, timelines, learning curve, etc.), other solutions exist that depending on the use case may cover our needs. For simple cases, one could utilize a database table as a "queue" and have multiple consumer instances, polling the database periodically. If they find a record in a status appropriate for processing, then they have to apply a locking mechanism in order to make sure no other instance will duplicate the work. Upon finishing the task they need a release to update the status of the record and release the lock. Another solution is the one that we will demonstrate in the following sections. It relies on using Hazelcast in embedded mode and taking advantage of the distributed data structures it provides, namely the distributed queue. Solution Description For our demonstration, we will use 2 microservices implemented with Spring Boot: the Producer and the Consumer. Multiple instances of them can be executed. Hazelcast is included as a dependency on both of them, along with some basic Java configuration. As mentioned, Hazelcast will run in embedded mode, meaning that there will be no Hazelcast server. The running instances, upon startup, will use a discovery mechanism and form a decentralized cluster or Hazelcast IMDG (in-memory data grid). Hazelcast can also be used in the classic client-server mode. Each approach has its advantages and disadvantages. You may refer to Choosing an Application Topology for more details. We have selected to have the Producers configured as Hazelcast "Members" and the consumers as Hazelcast "Clients." Let's have a look at the characteristics of each: Hazelcast Client: A Hazelcast client is an application that connects to a Hazelcast cluster to interact with it. It acts as a lightweight gateway or proxy to the Hazelcast cluster, allowing clients to access and utilize the distributed data structures and services provided by Hazelcast. The client does not store or own data; it simply communicates with the Hazelcast cluster to perform operations on distributed data. Clients can be deployed independently of the Hazelcast cluster and can connect to multiple Hazelcast clusters simultaneously. Clients are typically used for read and write operations, as well as for executing distributed tasks on the Hazelcast cluster. Clients can be implemented in various programming languages (Java, .NET, C++, Python, etc.) using the Hazelcast client libraries provided. Hazelcast Member: A Hazelcast member refers to an instance of Hazelcast that is part of the Hazelcast cluster and participates in the distributed data grid. Members store data, participate in the cluster's data distribution and replication, and provide distributed processing capabilities. Members join together to form a Hazelcast cluster, where they communicate and collaborate to ensure data consistency and fault tolerance. Members can be deployed on different machines or containers, forming a distributed system. Each member holds a subset of the data in the cluster, and data is distributed across members using Hazelcast's partitioning strategy. Members can also execute distributed tasks and provide distributed caching and event-driven capabilities. Deciding which microservice will be a Member vs a simple Client again depends on multiple factors such as data volume, processing requirements, network communication, resource utilization, and the possible restrictions of your system. For our example, here's the diagram with the selected architecture: Demo Architecture Diagram On the Producer side, which is a Hazelcast Member, we have enabled Queueing with Persistent Datastore. The datastore of choice is a PostgreSQL database. In the following sections, we dive into the implementation details of the Producer and Consumer. You can find the complete source code over on the GitHub repository. Note that it includes a docker-compose file for starting the PostgreSQL database and the Hazelcast Management Center, which we will also describe later on. The Producer The Producer's Hazelcast configuration is the following: Java @Configuration public class HazelcastConfig { @Value("${demo.queue.name}") private String queueName; @Value("${demo.dlq.name}") private String dlqName; @Bean public HazelcastInstance hazelcastInstance(QueueStore<Event> queueStore) { Config config = new Config(); QueueConfig queueConfig = config.getQueueConfig("main"); queueConfig.setName(queueName) .setBackupCount(1) .setMaxSize(0) .setStatisticsEnabled(true); queueConfig.setQueueStoreConfig(new QueueStoreConfig() .setEnabled(true) .setStoreImplementation(queueStore) .setProperty("binary", "false")); QueueConfig dlqConfig = config.getQueueConfig("main-dlq"); dlqConfig.setName(dlqName) .setBackupCount(1) .setMaxSize(0) .setStatisticsEnabled(true); config.addQueueConfig(queueConfig); config.addQueueConfig(dlqConfig); return Hazelcast.newHazelcastInstance(config); } } We define 2 queues: The "main" queue, which will hold the messages (Events) to be processed and is backed by the PostgreSQL QueueStore A "main-dlq," which will be used as a DLQ, and consumers will push items to it if they encounter errors To keep it shorter, the DLQ is not backed by a persistent store. For a full explanation of configuration options and recommendations, refer to Hazelcast's Configuring Queue documentation. Our PostgreSQL QueueStore implements the QueueStore<T> interface and we inject a Spring Data JPA Repository for this purpose: Java @Component @Slf4j public class PostgresQueueStore implements QueueStore<Event> { private final EventRepository eventRepository; public PostgresQueueStore(@Lazy EventRepository eventRepository) { this.eventRepository = eventRepository; } @Override public void store(Long key, Event value) { log.info("store() called for {}" , value); EventEntity entity = new EventEntity(key, value.getRequestId().toString(), value.getStatus().name(), value.getDateSent(), LocalDateTime.now()); eventRepository.save(entity); } @Override public void delete(Long key) { log.info("delete() was called for {}", key); eventRepository.deleteById(key); } // rest methods ommited for brevity... } For testing purposes, the Producer includes a REST Controller with a single endpoint. Once invoked, it creates an Event object with a random UUID and offers it to the main queue: Java @RestController public class ProducerController { @Value("${demo.queue.name}") private String queueName; private final HazelcastInstance hazelcastInstance; public ProducerController(HazelcastInstance hazelcastInstance) { this.hazelcastInstance = hazelcastInstance; } private BlockingQueue<Event> retrieveQueue() { return hazelcastInstance.getQueue(queueName); } @PostMapping("/") public Boolean send() { return retrieveQueue().offer(Event.builder() .requestId(UUID.randomUUID()) .status(Status.APPROVED) .dateSent(LocalDateTime.now()) .build()); } } You can create a script or use a load-generator tool to perform more extensive testing. You may also start multiple instances of the Producer microservice and monitor the cluster's behavior via the LOGs and the Management Console. The Consumer Let's start again with the Consumer's Hazelcast configuration. This time we are dealing with a Hazelcast client: Java @Configuration public class HazelcastConfig { @Value("${demo.queue.name}") private String queueName; @Value("${demo.dlq.name}") private String dlqName; @Bean public HazelcastInstance hazelcastInstance() { ClientConfig config = new ClientConfig(); // add more settings per use-case return HazelcastClient.newHazelcastClient(config); } @Bean public IQueue<Event> eventQueue(HazelcastInstance hazelcastInstance) { return hazelcastInstance.getQueue(queueName); } @Bean public IQueue<Event> dlq(HazelcastInstance hazelcastInstance) { return hazelcastInstance.getQueue(dlqName); } } We are calling the HazelcastClient.newHazelcastClient() method this time in contrast to Hazelcast.newHazelcastInstance() on the Producer side. We are also creating 2 beans corresponding to the queues we will need access to; i.e., the main queue and the DLQ. Now, let's have a look at the message consumption part. Hazelcast library does not offer a pure event-based listener approach (see related GitHub issue). The only way of consuming a new queue entry is through the queue.take() blocking method (usually contained in a do...while loop). To make this more "resource-friendly" and to avoid blocking the main thread of our Spring-based Java application, we will perform this functionality in background virtual threads: Java @Component @Slf4j public class EventConsumer { @Value("${demo.virtual.threads}") private int numberOfThreads = 1; @Value("${demo.queue.name}") private String queueName; private final IQueue<Event> queue; private final DemoService demoService; private final FeatureManager manager; public static final Feature CONSUMER_ENABLED = new NamedFeature("CONSUMER_ENABLED"); public EventConsumer(@Qualifier("eventQueue") IQueue<Event> queue, DemoService demoService, FeatureManager manager) { this.queue = queue; this.demoService = demoService; this.manager = manager; } @PostConstruct public void init() { startConsuming(); } public void startConsuming() { for (var i = 0; i < numberOfThreads; i++) { Thread.ofVirtual() .name(queueName + "_" + "consumer-" + i) .start(this::consumeMessages); } } private void consumeMessages() { while (manager.isActive(CONSUMER_ENABLED)) { try { Event event = queue.take(); // Will block until an event is available or interrupted // Process the event log.info("EventConsumer::: Processing {} ", event); demoService.doWork(event); } catch (InterruptedException e) { // Handle InterruptedException as per your application's requirements log.error("Encountered thread interruption: ", e); Thread.currentThread().interrupt(); } } } } Depending on the nature of the processing we need and the desired throughput, we can have a multi-threaded EventConsumer by specifying the desired number of virtual threads in the configuration file. The important thing to note here is that because Hazecast Queue is like a distributed version of java.util.concurrent.BlockingQueue, we are assured that each item will be taken from the queue by a single consumer even if multiple instances are running and on different JVMs. The actual "work" is delegated to a DemoService, which has single @Retryable method: Java @Retryable(retryFor = Exception.class, maxAttempts = 2) @SneakyThrows // @Async: enable this if you want to delegate processing to a ThreadPoolTaskExecutor public void doWork(Event event) { // do the actual processing... EventProcessedEntity processed = new EventProcessedEntity(); processed.setRequestId(event.getRequestId().toString()); processed.setStatus(event.getStatus().name()); processed.setDateSent(event.getDateSent()); processed.setDateProcessed(LocalDateTime.now()); processed.setProcessedBy(InetAddress.getLocalHost().getHostAddress() + ":" + webServerAppCtxt.getWebServer().getPort()); eventProcessedRepository.save(processed); } @Recover void recover(Exception e, Event event) { log.error("Error processing {} ", event, e); var result = dlq.offer(event); log.info("Adding {} in DQL result {}", event, result); } If the method fails for a configurable number of retries, the @Recover annotated method is fired and the event is delegated to the DLQ. The demo does not include implementation for the manipulation of DLQ messages. One more thing to add here is that we could possibly annotate this method with @Async and configure a ThreadPoolTaskExecutor(or a SimpleAsyncTaskExecutor for which Spring will soon add first-class configuration options for virtual threads - check here for more info). This way the consumption of messages would be less blocking if the processing task (i.e., the actual work) is more "heavy." At this point and regarding error handling, we need to mention that Hazecast also supports Transactions. For more information about this capability, you may check the official documentation here. You could easily modify the Consumer's code in order to try a transactional approach instead of the existing error handling. Testing and Monitoring As mentioned above, once executed our docker-compose file will start the PostgreSQL database (with the necessary tables in place) and the Hazelcast Management console which you can access at http://localhost:8080. Hazelcast Management Console You can start multiple instances of Producers and Consumers and monitor them via the console. At the same time, you may start producing some traffic and confirm the load-balancing and parallelization achieved with our competing consumers. Moreover, the Consumer application includes the Chaos Monkey dependency as well, which you can use to introduce various exceptions, delays, restarts, etc., and observe the system's behavior. Wrap Up To wrap it up, competing consumers with Hazelcast and Spring Boot is like a friendly race among hungry microservices for delicious messages. It's a recipe for efficient and scalable message processing, so let the testing begin, and may the fastest consumer prevail!
Exploratory Data Analysis (EDA) is an essential step in any data science project, as it allows us to understand the data, detect patterns, and identify potential issues. In this article, we will explore how to use two popular Python libraries, Pandas and Matplotlib, to perform EDA. Pandas is a powerful library for data manipulation and analysis, while Matplotlib is a versatile library for data visualization. We will cover the basics of loading data into a pandas DataFrame, exploring the data using pandas functions, cleaning the data, and finally, visualizing the data using Matplotlib. By the end of this article, you will have a solid understanding of how to use Pandas and Matplotlib to perform EDA in Python. Importing Libraries and Data Importing Libraries To use the pandas and Matplotlib libraries in your Python code, you need to first import them. You can do this using the import statement followed by the name of the library. Python python import pandas as pd import matplotlib.pyplot as plt In this example, we're importing pandas and aliasing it as 'pd', which is a common convention in the data science community. We're also importing matplotlib.pyplot and aliasing it as 'plt'. By importing these libraries, we can use their functions and methods to work with data and create visualizations. Loading Data Once you've imported the necessary libraries, you can load the data into a pandas DataFrame. Pandas provides several methods to load data from various file formats, including CSV, Excel, JSON, and more. The most common method is read_csv, which reads data from a CSV file and returns a DataFrame. Python python# Load data into a pandas DataFrame data = pd.read_csv('path/to/data.csv') In this example, we're loading data from a CSV file located at 'path/to/data.csv' and storing it in a variable called 'data'. You can replace 'path/to/data.csv' with the actual path to your data file. By loading data into a pandas DataFrame, we can easily manipulate and analyze the data using pandas' functions and methods. The DataFrame is a 2-dimensional table-like data structure that allows us to work with data in a structured and organized way. It provides functions for selecting, filtering, grouping, aggregating, and visualizing data. Data Exploration head() and tail() The head() and tail() functions are used to view the first few and last few rows of the data, respectively. By default, these functions display the first/last five rows of the data, but you can specify a different number of rows as an argument. Python python# View the first 5 rows of the data print(data.head()) # View the last 10 rows of the data print(data.tail(10)) info() The info() function provides information about the DataFrame, including the number of rows and columns, the data types of each column, and the number of non-null values. This function is useful for identifying missing values and determining the appropriate data types for each column. Python python# Get information about the data print(data.info()) describe() The describe() function provides summary statistics for numerical columns in the DataFrame, including the count, mean, standard deviation, minimum, maximum, and quartiles. This function is useful for getting a quick overview of the distribution of the data. Python python# Get summary statistics for the data print(data.describe()) value_counts() The value_counts() function is used to count the number of occurrences of each unique value in a column. This function is useful for identifying the frequency of specific values in the data. Python python# Count the number of unique values in a column print(data['column_name'].value_counts()) These are just a few examples of panda functions you can use to explore data. There are many other functions you can use depending on your specific data exploration needs, such as isnull() to check for missing values, groupby() to group data by a specific column, corr() to calculate correlation coefficients between columns and more. Data Cleaning isnull() The isnull() function is used to check for missing or null values in the DataFrame. It returns a DataFrame of the same shape as the original, with True values where the data is missing and False values where the data is present. You can use the sum() function to count the number of missing values in each column. Python python# Check for missing values print(data.isnull().sum()) dropna() The dropna() function is used to remove rows or columns with missing or null values. By default, this function removes any row that contains at least one missing value. You can use the subset argument to specify which columns to check for missing values and the how argument to specify whether to drop rows with any missing values or only rows where all values are missing. Python python# Drop rows with missing values data = data.dropna() drop_duplicates() The drop_duplicates() function is used to remove duplicate rows from the DataFrame. By default, this function removes all rows that have the same values in all columns. You can use the subset argument to specify which columns to check for duplicates. Python python# Drop duplicate rows data = data.drop_duplicates() replace() The replace() function is used to replace values in a column with new values. You can specify the old value to replace and the new value to replace it. This function is useful for handling data quality issues such as misspellings or inconsistent formatting. Python python# Replace values in a column data['column_name'] = data['column_name'].replace('old_value', 'new_value') These are just a few examples of pandas functions you can use to clean data. There are many other functions you can use depending on your specific data-cleaning needs, such as fillna() to fill missing values with a specific value or method, astype() to convert data types of columns, clip() to trim outliers and more. Data cleaning plays a crucial role in preparing data for analysis, and automating the process can save time and ensure data quality. In addition to the panda's functions mentioned earlier, automation techniques can be applied to streamline data-cleaning workflows. For instance, you can create reusable functions or pipelines to handle missing values, drop duplicates, and replace values across multiple datasets. Moreover, you can leverage advanced techniques like imputation to fill in missing values intelligently or regular expressions to identify and correct inconsistent formatting. By combining the power of pandas functions with automation strategies, you can efficiently clean and standardize data, improving the reliability and accuracy of your exploratory data analysis (EDA). Data Visualization Data visualization is a critical component of data science, as it allows us to gain insights from data quickly and easily. Matplotlib is a popular Python library for creating a wide range of data visualizations, including scatter plots, line plots, bar charts, histograms, box plots, and more. Here are a few examples of how to create these types of visualizations using Matplotlib: Scatter Plot A scatter plot is used to visualize the relationship between two continuous variables. You can create a scatter plot in Matplotlib using the scatter() function. Python python# Create a scatter plot plt.scatter(data['column1'], data['column2']) plt.xlabel('Column 1') plt.ylabel('Column 2') plt.show() In this example, we're creating a scatter plot with column1 on the x-axis and column2 on the y-axis. We're also adding labels to the x-axis and y-axis using the xlabel() and ylabel() functions. Histogram A histogram is used to visualize the distribution of a single continuous variable. You can create a histogram in Matplotlib using the hist() function. Python python# Create a histogram plt.hist(data['column'], bins=10) plt.xlabel('Column') plt.ylabel('Frequency') plt.show() In this example, we're creating a histogram of the column variable with 10 bins. We're also adding labels to the x-axis and y-axis using the xlabel() and ylabel() functions. Box Plot A box plot is used to visualize the distribution of a single continuous variable and to identify outliers. You can create a box plot in Matplotlib using the boxplot() function. Python python# Create a box plot plt.boxplot(data['column']) plt.ylabel('Column') plt.show() In this example, we're creating a box plot of the column variable. We're also adding a label to the y-axis using the ylabel() function. These are just a few examples of what you can do with Matplotlib for data visualization. There are many other functions and techniques you can use, depending on the specific requirements of your project. Conclusion Exploratory data analysis (EDA) is a crucial step in any data science project, and Python provides powerful tools to perform EDA effectively. In this article, we have learned how to use two popular Python libraries, Pandas and Matplotlib, to load, explore, clean, and visualize data. Pandas provides a flexible and efficient way to manipulate and analyze data, while Matplotlib provides a wide range of options to create visualizations. By leveraging these two libraries, we can gain insights from data quickly and easily. With the skills and techniques learned in this article, you can start performing EDA on your own datasets and uncover valuable insights that can drive data-driven decision-making.
As technology professionals, we are already aware that our world is increasingly data-driven. This is especially true in the realm of financial markets, where algorithmic trading has become the norm, leveraging complex algorithms to execute trades at speeds and frequencies that far outstrip human capabilities. In this world where milliseconds can mean the difference between profit and loss, algorithmic trading provides an edge by making trading more systematic and less influenced by human emotional biases. But what if we could take this a step further? What if our trading algorithms could learn from their mistakes, adapt to new market conditions, and continually improve their performance over time? This is where reinforcement learning, a cutting-edge field in artificial intelligence, comes into play. Reinforcement learning (RL) is an area of machine learning that's focused on making decisions. It is about learning from interaction with an environment to achieve a goal, often formulated as a game where the RL agent learns to make moves to maximize its total reward. It is the technology that now being applied to a variety of problems, from self-driving cars to resource allocation in computer networks. But reinforcement learning's potential remains largely untapped in the world of algorithmic trading. This is surprising, given that trading is essentially a sequential decision-making problem, which is exactly what reinforcement learning is designed to handle. In this article, we will delve into how reinforcement learning can enhance algorithmic trading, explore the challenges involved, and discuss the future of this exciting intersection of AI and finance. Whether you're a data scientist interested in applying your skills to financial markets, or a technology enthusiast curious about the practical applications of reinforcement learning, this article has something for you. Understanding Algorithmic Trading Algorithmic trading, also known as algo-trading or black-box trading, utilizes complex formulas and high-speed, computer-programmed instructions to execute large orders in financial markets with minimal human intervention. It is a practice that has revolutionized the finance industry and is becoming increasingly prevalent in today's digital age. At its core, algorithmic trading is about making the trading process more systematic and efficient. It involves the use of sophisticated mathematical models to make lightning-fast decisions about when, how, and what to trade. This ability to execute trades at high speeds and high volumes offers significant advantages, including reduced risk of manual errors, improved order execution speed, and the ability to backtest trading strategies on historical data. In addition, algorithmic trading can implement complex strategies that would be impossible for humans to execute manually. These strategies can range from statistical arbitrage (exploiting statistical patterns in prices) to mean reversion (capitalizing on price deviations from long-term averages). An important aspect of algorithmic trading is that it removes emotional human influences from the trading process. Decisions are made based on pre-set rules and models, eliminating the potential for human biases or emotions to interfere with trading decisions. This can lead to more consistent and predictable trading outcomes. However, as powerful as algorithmic trading is, it is not without its challenges. One of the primary difficulties lies in the development of effective trading algorithms. These algorithms must be robust enough to handle a wide range of market conditions and flexible enough to adapt to changing market dynamics. They also need to be able to manage risk effectively, a task that becomes increasingly challenging as the speed and volume of trading increase. This is where reinforcement learning can play a critical role. With its ability to learn from experience and adapt its strategies over time, reinforcement learning offers a promising solution to the challenges faced by traditional algorithmic trading strategies. In the next section, we will delve deeper into the principles of reinforcement learning and how they can be applied to algorithmic trading. The Basics of Reinforcement Learning Reinforcement Learning (RL) is a subfield of artificial intelligence that focuses on decision-making processes. In contrast to other forms of machine learning, reinforcement learning models learn by interacting with their environment and receiving feedback in the form of rewards or penalties. The fundamental components of a reinforcement learning system are the agent, the environment, states, actions, and rewards. The agent is the decision-maker, the environment is what the agent interacts with, states are the situations the agent finds itself in, actions are what the agent can do, and rewards are the feedback the agent gets after taking an action. One key concept in reinforcement learning is the idea of exploration vs exploitation. The agent needs to balance between exploring the environment to find out new information and exploiting the knowledge it already has to maximize the rewards. This is known as the exploration-exploitation tradeoff. Another important aspect of reinforcement learning is the concept of a policy. A policy is a strategy that the agent follows while deciding on an action from a particular state. The goal of reinforcement learning is to find the optimal policy, which maximizes the expected cumulative reward over time. Reinforcement learning has been successfully applied in various fields, from game playing (like the famous AlphaGo) to robotics (for teaching robots new tasks). Its power lies in its ability to learn from trial and error and improve its performance over time. In the context of algorithmic trading, the financial market can be considered as the environment, the trading algorithm as the agent, the market conditions as the states, the trading decisions (buy, sell, hold) as the actions, and the profit or loss from the trades as the rewards. Applying reinforcement learning to algorithmic trading means developing trading algorithms that can learn and adapt their trading strategies based on feedback from the market, with the aim of maximizing the cumulative profit. However, implementing reinforcement learning in trading comes with its own unique challenges, which we will explore in the following sections. The Intersection of Algorithmic Trading and Reinforcement Learning The intersection of algorithmic trading and reinforcement learning represents an exciting frontier in the field of financial technology. At its core, the idea is to create trading algorithms that can learn from past trades and iteratively improve their trading strategies over time. In a typical reinforcement learning setup for algorithmic trading, the agent (the trading algorithm) interacts with the environment (the financial market) by executing trades (actions) based on the current market conditions (state). The result of these trades, in terms of profit or loss, serves as the reward or penalty, guiding the algorithm to adjust its strategy. One of the key advantages of reinforcement learning in this context is its ability to adapt to changing market conditions. Financial markets are notoriously complex and dynamic, with prices affected by a myriad of factors, from economic indicators to geopolitical events. A trading algorithm that can learn and adapt in real-time has a significant advantage over static algorithms. For example, consider a sudden market downturn. A static trading algorithm might continue executing trades based on its pre-programmed strategy, potentially leading to significant losses. In contrast, a reinforcement learning-based algorithm could recognize the change in market conditions and adapt its strategy accordingly, potentially reducing losses or even taking advantage of the downturn to make profitable trades. Another advantage of reinforcement learning in trading is its ability to handle high-dimensional data and make decisions based on complex, non-linear relationships. This is especially relevant in today's financial markets, where traders have access to vast amounts of data, from price histories to social media sentiment. For instance, a reinforcement learning algorithm could be trained to take into account not just historical price data, but also other factors such as trading volume, volatility, and even news articles or tweets, to make more informed trading decisions. Challenges and Solutions of Implementing Reinforcement Learning in Algorithmic Trading While the potential benefits of using reinforcement learning in algorithmic trading are significant, it's also important to understand the challenges and complexities associated with its implementation. Overcoming the Curse of Dimensionality The curse of dimensionality refers to the exponential increase in computational complexity as the number of features (dimensions) in the dataset grows. For a reinforcement learning model in trading, each dimension could represent a market factor or indicator, and the combination of all these factors constitutes the state space, which can become enormous. One approach to mitigating the curse of dimensionality is through feature selection, which involves identifying and selecting the most relevant features for the task at hand. By reducing the number of features, we can effectively shrink the state space, making the learning problem more tractable. Python from sklearn.feature_selection import SelectKBest, mutual_info_regression # Assume X is the feature matrix, and y is the target variable k = 10 # Number of top features to select selector = SelectKBest(mutual_info_regression, k=k) X_reduced = selector.fit_transform(X, y) Another approach is dimensionality reduction, such as Principal Component Analysis (PCA) or t-distributed Stochastic Neighbor Embedding (t-SNE). These techniques transform the original high-dimensional data into a lower-dimensional space, preserving as much of the important information as possible. Python from sklearn.decomposition import PCA # Assume X is the feature matrix n_components = 5 # Number of principal components to keep pca = PCA(n_components=n_components) X_reduced = pca.fit_transform(X) Handling Uncertainty and Noise Financial markets are inherently noisy and unpredictable, with prices influenced by numerous factors. To address this, we can incorporate techniques that manage uncertainty into our reinforcement learning model. For example, Bayesian methods can be used to represent and manipulate uncertainties in the model. Additionally, reinforcement learning algorithms like Q-learning and SARSA can be used, which learn an action-value function and are known to handle environments with a high degree of uncertainty. Preventing Overfitting Overfitting happens when a model becomes too specialized to the training data and performs poorly on unseen data. Regularization techniques, such as L1 and L2 regularization, can help prevent overfitting by penalizing overly complex models. Python from sklearn.linear_model import Ridge # Assume X_train and y_train are the training data alpha = 0.5 # Regularization strength ridge = Ridge(alpha=alpha) ridge.fit(X_train, y_train) Another way to prevent overfitting is through the use of validation sets and cross-validation. By regularly evaluating the model's performance on a separate validation set during the training process, we can keep track of how well the model is generalizing to unseen data. Python from sklearn.model_selection import cross_val_score from sklearn.linear_model import LinearRegression # Assume X and y are the feature matrix and target variable model = LinearRegression() cv_scores = cross_val_score(model, X, y, cv=5) # 5-fold cross-validation Balancing Exploration and Exploitation Striking the right balance between exploration (trying out new actions) and exploitation (sticking to known actions) is a key challenge in reinforcement learning. Several strategies can be used to manage this tradeoff. One common approach is the epsilon-greedy strategy, where the agent mostly takes the action that it currently thinks is best (exploitation), but with a small probability (epsilon), it takes a random action (exploration). Python import numpy as np def epsilon_greedy(Q, state, n_actions, epsilon): if np.random.random() < epsilon: return np.random.randint(n_actions) # Exploration: choose a random action else: return np.argmax(Q[state]) # Exploitation: choose the action with the highest Q-value Another approach is the Upper Confidence Bound (UCB) method, where the agent chooses actions based on an upper bound of the expected reward, encouraging exploration of actions with high potential. Python import numpy as np import math def ucb_selection(plays, rewards, t): n_arms = len(plays) ucb_values = [0] * n_arms for i in range(n_arms): if plays[i] == 0: ucb_values[i] = float('inf') else: ucb_values[i] = rewards[i] / plays[i] + math.sqrt(2 * math.log(t) / plays[i]) return np.argmax(ucb_values) Future Perspectives The intersection of reinforcement learning and algorithmic trading is a burgeoning field, and while it's already showing promise, there are several exciting developments on the horizon. One of the most prominent trends is the increasing use of deep reinforcement learning, which combines the decision-making capabilities of reinforcement learning with the pattern recognition capabilities of deep learning. Deep reinforcement learning has the potential to handle much more complex decision-making tasks, making it especially suited to the intricacies of financial markets. We can also expect to see more sophisticated reward structures in reinforcement learning models. Current models often use simple reward structures, such as profit or loss from a trade. However, future models could incorporate more nuanced rewards, taking into account factors such as risk, liquidity, and transaction costs. This would allow for the development of more balanced and sustainable trading strategies. Another intriguing prospect is the use of reinforcement learning for portfolio management. Instead of making decisions about individual trades, reinforcement learning could be used to manage a portfolio of assets, deciding what proportion of the portfolio to allocate to each asset in order to maximize returns and manage risk. In terms of research, there's a lot of ongoing work aimed at overcoming the challenges associated with reinforcement learning in trading. For instance, researchers are exploring methods to manage the exploration-exploitation tradeoff more effectively, to deal with the curse of dimensionality, and to prevent overfitting. In conclusion, while reinforcement learning in algorithmic trading is still a relatively new field, it holds immense potential. By continuing to explore and develop this technology, we could revolutionize algo-trading, making it more efficient, adaptable, and profitable. As technology professionals, we have the exciting opportunity to be at the forefront of this revolution.
AWS Lambda is a popular serverless platform that allows developers to run code without provisioning or managing servers. In this article, we will discuss how to implement a serverless DevOps pipeline using AWS Lambda and CodePipeline. What Is AWS Lambda? AWS Lambda is a computing service that runs code in response to events and automatically scales to meet the demand of the application. Lambda supports several programming languages, including Node.js, Python, Java, Go, and C#. CodePipeline is a continuous delivery service that automates the build, test, and deployment of applications. CodePipeline integrates seamlessly with other AWS services, such as CodeCommit, CodeBuild, CodeDeploy, and Lambda. Creation of the Lambda Function To implement a serverless DevOps pipeline, we first need to create a Lambda function that will act as a build step in CodePipeline. The Lambda function will be responsible for building the application code and creating a deployment package. The deployment package will be stored in an S3 bucket, which will be used as an input artifact for the deployment step. Implementing the CodePipeline The next step is to create a CodePipeline pipeline that will orchestrate the build, test, and deployment process. The pipeline will consist of three stages: Source, Build, and Deploy. The Source stage will pull the application code from a Git repository, such as CodeCommit. The Build stage will invoke the Lambda function to build the application code and create a deployment package. The Deploy stage will deploy the application to a target environment, such as an EC2 instance or a Lambda function. The Build Stage In the Build stage, the Lambda function will be triggered by a CodePipeline event. The event will contain information about the source code, such as the Git commit ID and the branch name. The Lambda function will use this information to fetch the source code from the Git repository and build the application. The Lambda function will then create a deployment package, which will be stored in an S3 bucket. The deployment package will contain the application code, as well as any dependencies, configuration files, and scripts required to deploy the application. The Deploy Stage In the Deploy stage, we will use AWS CodeDeploy to deploy the application to a target environment. CodeDeploy is a deployment service that automates the deployment of applications to Amazon EC2 instances, Lambda functions, or on-premises servers. CodeDeploy uses deployment groups to deploy applications to one or more instances in a target environment. The deployment group can be configured to perform rolling deployments, blue/green deployments, or custom deployment strategies. Using CodeDeploy We can use CodeDeploy to deploy the application to a Lambda function by creating a deployment group that targets the Lambda function. The deployment group can be configured to use the deployment package created in the Build stage as the input artifact. CodeDeploy will then create a new version of the Lambda function and update the alias to point to the new version. This will ensure that the new version is deployed gradually and that the old version is still available until the new version is fully deployed. Conclusion In conclusion, implementing a serverless DevOps pipeline with AWS Lambda and CodePipeline can help to streamline the software delivery process, reduce costs and improve scalability. By using Lambda as a build step in CodePipeline, we can automate the build process and create deployment packages that can be easily deployed to a target environment using CodeDeploy. With continuous delivery, we can ensure that new features and bug fixes are delivered to customers quickly and reliably.
REST API is so widespread that you can even use it to manage distributed SQL databases. In this blog post, I will show you how to manage and work with YugabyteDB Managed clusters via REST. You'll learn how to generate an API key, retrieve a list of existing clusters, create a new cluster, and finally, start a sample application that streams market orders into the cluster. Let's get started! Generating a YugabyteDB Managed API Key First, you'll need an API key to interact with the YugabyteDB Managed REST API. To generate a new key, follow these steps: Follow along with the sister video—YugabyteDB Managed REST API Head over to the YugabyteDB Managed UI. Navigate to the API Keys section. Click on the Create API Key button. Give the key a name, select a role (e.g., 'Developer' or 'Admin'), and define an expiration time. Click on the Generate API Key button. Once the key is generated, copy it somewhere safe, since you will not be able to see it again. You need to use this API key in the Authorization:Bearer header for all your requests to YugabyteDB Managed. If you use httpie, then your first REST request can be as simple as this: Shell http GET https://cloud.yugabyte.com/api/public/v1/accounts \ -A bearer -a {YOUR_API_KEY} Retrieve a List of Existing YugabyteDB Clusters Next, let’s run more sophisticated requests via REST. How about getting a list of YugabyteDB Managed clusters? To fetch that list, you need to make an API call to the clusters endpoint: Shell http GET https://cloud.yugabyte.com/api/public/v1/accounts/{YOUR_ACCOUNT_ID}/projects/{YOUR_PROJECT_ID}/clusters \ -A bearer -a {YOUR_API_KEY} For this and many other API endpoints, you'll need to provide the accountID and the projectID. You can find these values in the URL of your YugabyteDB Managed UI. After making this API call to the clusters endpoint, you'll get a JSON response containing the list of database clusters. If the data field is empty, it means there are no database clusters yet. JSON { "_metadata": { "continuation_token": null, "links": { "next": null, "self": "/api/public/v1/accounts/{YOUR_ACCOUNT_ID}/projects/{YOUR_PROJECT_ID}/clusters" } }, "data": [] } Create a YugabyteDB Managed Cluster To create a new YugabyteDB Managed cluster, you'll need to make a POST HTTP request to the clusters endpoint and provide the necessary cluster configuration details. Here's an example request for provisioning a new distributed YugabyteDB cluster: Shell http POST https://cloud.yugabyte.com/api/public/v1/accounts/{YOUR_ACCOUNT_ID}/projects/{YOUR_PROJECT_ID}/clusters \ -A bearer -a {YOUR_API_KEY} < cluster_config.json The clusters_config.json file includes a required cluster configuration in JSON format. The configuration below requests to start a 3-node cluster in the us-east1 region of the Google Cloud Platform: JSON { "cluster_spec": { "name": "sample-cluster", "cluster_info": { "cluster_tier": "PAID", "cluster_type": "SYNCHRONOUS", "num_nodes": 3, "fault_tolerance": "ZONE", "node_info": { "num_cores": 2, "memory_mb": 8192, "disk_size_gb": 50 }, "is_production": false, "version": null }, "software_info": { "track_id": "d9618d5e-9591-445b-a280-705770f5fb30" }, "cluster_region_info": [ { "placement_info": { "cloud_info": { "code": "GCP", "region": "us-east1" }, "num_nodes": 3, "multi_zone": true }, "is_default": true, "is_affinitized": true, "accessibility_types": [ "PUBLIC" ] } ] }, "db_credentials": { "ycql": { "username": "admin", "password": "SuperStrongPassword" }, "ysql": { "username": "admin", "password": "SuperStrongPassword" } } } Once you've sent this POST request, your new cluster will start to provision. JSON "cluster_endpoints": [{ "accessibility_type": "PUBLIC", "host": null, "id": "5e24c4d3-3329-47e1-ba95-5f1fa36a86f0", "pse_id": null, "region": "us-east1", "state": "CREATING", "state_v1": "CREATING" }] You can continue checking the cluster status using the REST API, or you can wait while the deployment is finished with the YugabyteDB Managed UI. Start a Sample Application and Monitoring Cluster Metrics Now that you have your YugabyteDB Managed cluster up and running, let's start a sample application that streams market orders into the cluster. Once you start the application, make sure it is connected to the cluster, and streams market orders into the database as expected: Java ============= Trade Stats ============ Trades Count: 1040 Stock Total Proceeds Linen Cloth 2815.790039 Google 2201.330078 Bespin Gas 2132.620117 Apple 1536.520020 Elerium 1189.439941 ====================================== Next, let's use YugabyteDB Managed REST API to confirm the incoming requests using the cluster-metrics API endpoint: Shell http GET https://cloud.yugabyte.com/api/public/v1/accounts/{YOUR_ACCOUNT_ID}/projects/{YOUR_PROJECT_ID}/cluster-metrics?cluster_ids={YOUR_CLUSTER_ID} \ -A bearer -a {YOUR_API_KEY} cluster_ids parameter: You can find your cluster’s ID by sending the GET request to the clusters API endpoint (see the retrieving the clusters list section above) or by locating through the YugabyteDB Managed UI. If everything is working as expected, you'll see your cluster's metrics data, including the number of active read and write requests. JSON { "data": [ { "cluster_id": "bf7e8c4e-3540-4afb-a382-1b87d31cc0d8", "metrics": { "read_ops_per_second": 5.0, "write_ops_per_second": 10.0 } } ] } Congratulations! Congratulations! You now know how to use the YugabyteDB Managed REST API to generate an API key, retrieve a list of existing clusters, create a new cluster, and monitor the cluster metrics while streaming market orders from a sample application. You can now take full advantage of YugabyteDB Managed and its REST API to manage your database clusters more efficiently and programmatically.
I explained the concepts and theory behind Data Residency in a previous post. It's time to get our hands dirty and implement it in a simple demo. The Sample Architecture In the last section of the previous post, I proposed a sample architecture where location-based routing happened at two different stages: The API Gateway checks for an existing X-Country header. Depending on its value, it forwards the request to the computed upstream; If no value is found or no value matches, it forwards it to a default upstream. The application uses Apache Shardingsphre to route again, depending on the data. If the value computed by the API Gateway is correct, the flow stays "in its lane"; if not, it's routed to the correct database, but with a performance penalty as it's outside its lane. I simplified some aspects: The theory uses two API Gateway instances. For the demo, I used only one. Remember that the location isn't set client-side on the first request. It should be returned along the first response, stored, and reused by the client on subsequent calls. I didn't bother with implementing the client. I like my demos to be self-contained, so I didn't use any Cloud Provider. Here's the final component diagram: The data model is simple: We insert location-specific data on each database: SQL INSERT INTO europe.owner VALUES ('dujardin', 'fr', 'Jean Dujardin'); INSERT INTO europe.thingy VALUES (1, 'Croissant', 'dujardin'); INSERT INTO usa.owner VALUES ('wayne', 'us', 'John Wayne'); INSERT INTO usa.thingy VALUES (2, 'Lasso', 'wayne'); Finally, we develop a straightforward RESTful API to fetch thingies: GET /thingies/ GET /thingies/{id} Now that we have set the stage let's check how to implement routing at the two levels. Routing on Apache ShardingSphere Apache ShardingSphere offers two approaches: as a library inside the application, ShardingSphere-JDBC, or as a full-fledged deployable component, ShardingSphere-Proxy. You can also combine both. I chose the former because it's the easiest to set up. For a comparison between them, please check this table. The first step is to add the dependency to the POM: XML <dependency> <groupId>org.apache.shardingsphere</groupId> <artifactId>shardingsphere-jdbc-core</artifactId> <version>5.3.2</version> </dependency> ShardingSphere-JDBC acts as an indirection layer between the application and the data sources. We must configure the framework to use it. For Spring Boot, it looks like the following: YAML spring: datasource: driver-class-name: org.apache.shardingsphere.driver.ShardingSphereDriver #1 url: jdbc:shardingsphere:absolutepath:/etc/sharding.yml #2-3 JDBC-compatible ShardingSphere driver Configuration file Opposite to what the documentation tells, the full prefix is jdbc:shardingsphere:absolutepath. I've opened a PR to fix the documentation. The next step is to configure ShardingSphere itself with the data sources: YAML dataSources: #1 europe: dataSourceClassName: com.zaxxer.hikari.HikariDataSource driverClassName: org.postgresql.Driver jdbcUrl: "jdbc:postgresql://dbeurope:5432/postgres?currentSchema=europe" username: postgres password: root usa: dataSourceClassName: com.zaxxer.hikari.HikariDataSource driverClassName: org.postgresql.Driver jdbcUrl: "jdbc:postgresql://dbusa:5432/postgres?currentSchema=usa" username: postgres password: root rules: #2 - !SHARDING tables: owner: #3 actualDataNodes: europe.owner,usa.owner #4 tableStrategy: standard: shardingColumn: country #3 shardingAlgorithmName: by_country #5 shardingAlgorithms: by_country: type: CLASS_BASED #6 props: strategy: STANDARD algorithmClassName: ch.frankel.blog.dataresidency.LocationBasedSharding #7 Define the two data sources, europe and usa Define rules. Many rules are available; we will only use sharding to split data between Europe and USA locations Sharding happens on the country column of the owner table Actual shards Algorithm to use. ShardingSphere offers a couple of algorithms out-of-the-box, which generally try to balance data equally between the sources.As we want a particular split, we define our own Set the algorithm type Reference the custom algorithm class The final step is to provide the algorithm's code: Kotlin class LocationBasedSharding : StandardShardingAlgorithm<String> { //1 override fun doSharding(targetNames: MutableCollection<String>, shardingValue: PreciseShardingValue<String>) = when (shardingValue.value) { //2 "fr" -> "europe" "us" -> "usa" else -> throw IllegalArgumentException("No sharding over ${shardingValue.value} defined") } } Inherit from StandardShardingAlgorithm, where T is the data type of the sharding column. Here, it's country Based on the sharding column's value, return the name of the data source to use With all of the above, the application will fetch thingies in the relevant data source based on the owner's country. Routing on Apache APISIX We should route as early as possible to avoid an application instance in Europe fetching US data. In our case, it translates to routing at the API Gateway stage. I'll use APISIX standalone mode for configuration. Let's define the two upstreams: YAML upstreams: - id: 1 nodes: "appeurope:8080": 1 - id: 2 nodes: "appusa:8080": 1 Now, we shall define the routes where the magic happens: YAML routes: - uri: /thingies* #1 name: Europe upstream_id: 1 vars: [["http_x-country", "==", "fr"]] #2 priority: 3 #3 - uri: /thingies* #4 name: USA upstream_id: 2 vars: [["http_x-country", "==", "us"]] priority: 2 #3 - uri: /thingies* #5 name: default upstream_id: 1 priority: 1 #3 Define the route to the Europe-located app APISIX matches the HTTP methods, the URI, and the conditions. Here, the condition is that the X-Country header has the fr value APISIX evaluates matching in priority order, starting with the highest priority. If the request doesn't match, e.g., because the header doesn't have the set value, it evaluates the next route in the priority list. Define the route to the USA-located app Define a default route The first request carries no header; APISIX forwards it to the default route, where ShardingSphere finds data in the relevant data source. Subsequent requests set the X-Country header because the response to the first request carries the information, and the client has stored it. Remember that it's outside the scope of the demo. In most cases, it's set to the correct location; hence, the request will stay "in its lane." If not, the configured routing will still find the data in the appropriate location at the cost of increased latency to fetch data in the other lane. Observing the Flow in Practice It's always a good idea to check that the design behaves as expected. We can use OpenTelemetry for this. For more information on how to set up OpenTelemetry in such an architecture, please refer to End-to-end tracing with OpenTelemetry. Note that Apache ShardingSphere supports OpenTelemetry but doesn't provide the binary agent. You need to build it from a source. I admit I was too lazy to do it. Let's start with a headerless request: Shell curl localhost:9080/thingies/1 It uses the default route defined in APISIX and returns the correct data, thanks to ShardingSphere. Now, let's set the country to fr, which is correct. Shell curl -H 'X-Country: fr' localhost:9080/thingies/1 APISIX correctly forwards the request to the Europe-located app. Finally, imagine a malicious actor changing the header to get their hands on data that are located in the US. Shell curl -H 'X-Country: us' localhost:9080/thingies/1 APISIX forwards it to the USA-located app according to the header. However, Shardingsphere still fetches data from Europe. Conclusion In the previous post, I explained the concepts behind Data Residency. In this post, I implemented it within a simple architecture, thanks to Apache APISIX and Apache ShardingSphere. The demo simplifies reality but should be an excellent foundation for building your production-grade Data Residency architecture. The complete source code for this post can be found on GitHub. To go further: Apache ShardingSphere Sharding YAML configuration How to filter route by Nginx builtin variable
I was recently involved in the TypeScript migration of the ZK Framework. For those who are new to ZK, ZK is the Java counterpart of the Node.js stack; i.e., ZK is a Java full-stack web framework where you can implement event callbacks in Java and control frontend UI with Java alone. Over more than a decade of development and expansion, we have reached a code base of more than 50K JavaScript and over 400K Java code, but we noticed that we are spending almost the same amount of time and effort in maintaining Java and JavaScript code, which means, in our project, JavaScript is 8 times harder to maintain than Java. I would like to share the reason we made the move to migrate from JavaScript to TypeScript, the options we evaluated, how we automated a large part of the migration, and how it changed the way we work and gave us confidence. The Problem ZK has been a server-centric solution for more than a decade. In recent years, we noticed the need for cloud-native support and have made this the main goal of our upcoming new version, ZK 10. The new feature will alleviate servers’ burden by transferring much of the model-view-model bindings to the client side so that the server side becomes as stateless as possible. This brings benefits such as reduced server memory consumption, simplified load balancing for ZK 10 clustered backends, and potentially easier integration with other frontend frameworks. We call this effort “Client MVVM.” However, this implies huge growth of JavaScript code. As we are already aware that JavaScript is harder to maintain, it is high time that we made our JavaScript codebase easier to work with at 50k lines of code. Otherwise, extending the existing JavaScript code with the whole MVVM stack will become Sisyphean, if not impossible. We started to look at why Java has higher productivity and how we can bring the same productivity to our client side. Why Does Java Beat JavaScript at Large-Scale Development? What did Java get right to enable us an 8x boost in productivity? We conclude that the availability of static analysis is the primary factor. We design and write programs long before programs are executed and often before compilation. Normally, we refactor, implement new features, and fix bugs by modifying source code instead of modifying the compiler-generated machine code or the memory of the live program. That is, programmers analyze programs statically (before execution) as opposed to dynamically (during execution). Not only is static analysis more natural to humans, but static analysis is also easier to automate. Nowadays, compilers not only generate machine code from source code but also perform the sort of analysis that humans would do on source code like name resolution, initialization guards, dead-code analysis, etc. Humans can still perform static analysis on JavaScript code. However, without the help of automated static analyzers (compilers and linters), reasoning with JavaScript code becomes extremely error-prone and time-consuming. What value does the following JavaScript function return? It’s actually undefined instead of 1. Surprised? JavaScript function f() { return 1 } Compare this with Java, where we have the compiler to aid our reasoning “as we type." With TypeScript, the compiler will perform “automatic semicolon insertion” analysis followed by dead code analysis, yielding: Humans can never beat the meticulousness of machines. By delegating this sort of monotonous but critical tasks to machines, we can free up a huge amount of time while achieving unprecedented reliability. How Can We Enable Static Analysis for JavaScript? We evaluated the following 6 options and settled on TypeScript due to its extensive ECMA standard conformance, complete support for all mainstream JS module systems, and massive ecosystem. We provide a comparison of them at the end of the article. Here is a short synopsis. Google’s Closure Compiler: All types are specified in JSDoc, thereby bloating code and making inline type assertion very clumsy Facebook’s Flow: A much smaller ecosystem in terms of tooling and libraries compared to TypeScript Microsoft’s TypeScript: The most mature and complete solution Scala.js: Subpar; emitted JavaScript code ReScript: Requires a paradigm shift to purely functional programming; otherwise, very promising Semi-Automated Migration to TypeScript Prior to the TypeScript migration, our JavaScript code largely consisted of prototype inheritance via our ad-hoc zk.$extends function, as shown on the left-hand side. We intend to transform it to the semantically equivalent TypeScript snippet on the right-hand side. JavaScript Module.Class = zk.$extends(Super, { field: 1, field_: 2, _field: 3, $define: { field2: function () { // Do something in setter. }, }, $init: function() {}, method: function() {}, method_: function() {}, _method: function() {}, }, { staticField: 1, staticField_: 2, _staticField: 3, staticMethod: function() {}, staticMethod_: function() {}, _staticMethod: function() {}, }); TypeScript export namespace Module { @decorator('meta-data') export class Class extends Super { public field = 1; protected field_ = 2; private _field = 3; private _field2?: T; public getField2(): T | undefined { return this._field2; } public setField2(field2: T): this { const old = this._field2; this._field2 = field2; if (old !== field2) { // Do something in setter. } return this; } public constructor() { super(); } public method() {} protected method_() {} private _method() {} public static staticField = 1; protected static staticField_ = 2; private static _staticField = 3; public static staticMethod() {} protected static staticMethod_() {} private static _staticMethod() {} } } There are hundreds of such cases among which many have close to 50 properties. If we were to rewrite manually, it would not only take a very long time but be riddled with typos. Upon closer inspection, the transformation rules are quite straightforward. It should be subject to automation! Then, the process would be fast and reliable. Indeed, it is a matter of parsing the original JavaScript code into an abstract syntax tree (AST), modifying the AST according to some specific rules, and consolidating the modified AST into formatted source code. Fortunately, there is jscodeshift that does the parsing and consolidation of source code and provides a set of useful APIs for AST modification. Furthermore, there is AST Explorer that acts as a real-time IDE for jscodeshift so we can develop our jscodeshift transformation script productively. Better yet, we can author a custom typescript-eslint rule that spawns the jscodeshift script upon the presence of zk.$extends. Then, we can automatically apply the transformation to the whole codebase with the command eslint --fix. Let’s turn to the type T in the example above. Since jscodeshift presents us with the lossless AST (including comments), we can author a visitor that extracts the @return JSDoc of getter() if it can be found; if not, we can let the visitor walk into the method body of getter() and try to deduce the type T, e.g., deduce T to be string if the return value of getter() is the concatenation of this._field2 with some string. If still no avail, specify T as void, so that after jscodeshift is applied, the TypeScript compiler will warn us about a type mismatch. This way we can perform as much automated inference as possible before manual intervention and the sections required for manual inspection will be accurately surfaced by the compiler due to our fault injection. Besides whole file transformations like jscodeshift that can only run in batch mode, the typescript-eslint project allows us to author small and precise rules that update source code in an IDE, like VSCode, in real-time. For instance, we can author a rule that marks properties of classes or namespaces that begin or end with single underscores as @internal, so that documentation extraction tools and type definition bundlers can ignore them: TypeScript export namespace N { export function _helper() {} export class A { /** * Description ... */ protected doSomething_() {} } } TypeScript export namespace N { /** @internal */ export function _helper() {} export class A { /** * Description ... * @internal */ protected doSomething_() {} } } Regarding the example above, one would have to determine the existence of property-associating JSDoc, the pre-existence of the @internal tag, and the position to insert the @internal tag if missing. Since typescript-eslint also presents us with a lossless AST, it is easy to find the associating JSDoc of class or namespace properties. The only non-trivial task left is to parse, transform, and consolidate JSDoc fragments. Fortunately, this can be achieved with the TSDoc parser. Similar to activating jscodeshift via typescript-eslint in the first example, this second example is a case of delegating JSDoc transformation to the TSDoc parser upon a typescript-eslint rule match. With sufficient knowledge of JavaScript, TypeScript, and their build systems, one can utilize jscodeshift, typescript-eslint, AST Explorer, and the TSDoc parser to make further semantic guarantees of one’s codebase, and whenever possible, automate the fix with the handy eslint --fix command. The importance of static analysis cannot be emphasized enough! Bravo! Zk 10 Has Completely Migrated to TypeScript For ZK 10, we have actively undergone static analysis with TypeScript for all existing JavaScript code in our codebase. Not only were we able to fix existing errors (some are automatic with eslint --fix), thanks to the typescript-eslint project that enables lots of extra type-aware rules, we also wrote our own rules, and we are guaranteed to never make those mistakes ever again in the future. This means less mental burden and a better conscience for the ZK development team. Our Client MVVM effort also becomes much more manageable with TypeScript in place. The development experience is close to that of Java. In fact, some aspects are even better, as TypeScript has better type narrowing, structural typing, refinement types via literal types, and intersection/union types. As for our users, ZK 10 has become more reliable. Furthermore, our type definitions are freely available, so that ZK 10 users can customize the ZK frontend components with ease and confidence. In addition, users can scale their applications during execution with Client MVVM. Adopting TypeScript in ZK 10 further enables us to scale correctness during development. Both are fundamental improvements. Annex: Comparing Static Typing Solutions for JavaScript Google’s Closure Compiler Type system soundness unknown; Assumed as unsound, as sound type systems are rare @interface denotes nominal types whereas @record denotes structural types All type annotations are specified in comments leading to code bloat, and comments often go out of sync with the code. Most advanced and aggressive code optimization among all options listed here Find more information on GitHub Facebook’s Flow Unsound type system Nominal types for ES6 classes and structural types for everything else, unlike TypeScript where all types are structural; whereas in Java, all types are nominal Compared to TypeScript, Flow has a much smaller ecosystem in terms of tooling (compatible formatter, linter, IDE plugin) and libraries (TypeScript even has the DefinitelyTyped project to host type definitions on NPM) Find more information in Flow Documentation Microsoft’s TypeScript Supports all JavaScript features and follows the ECMA standard closely even for subtleties: class fields and TC39 decorators Seamless interoperation between all mainstream JavaScript module systems: ES modules, CommonJS, AMD, and UMD Unsound type system All types are structural, which is the most natural way to model dynamic types statically, but the ability to mark certain types as nominal would be good to have. Flow and the Closure Compiler have an edge in this respect. Also supports Closure-Compiler-style type annotations in comments Best-in-class tooling and a massive ecosystem; built-in support by VSCode; hence, its availability is almost ubiquitous Each enum variant is a separate subtype, unlike all other type systems we have ever encountered, including Rust, Scala 3, Lean 4, and Coq Find more information in The TypeScript Handbook Scala.js Leverages the awesome type system of Scala 3, which is sound Seamlessly shares build scripts (sbt) and code with any Scala 3 project The emitted JavaScript code is often bloated and sometimes less efficient than that of the Closure Compiler, Flow, and TypeScript. Learn more on the Scala.js site ReScript Touted to have a sound type system (where is the proof?) like that of Scala 3, but the syntax of ReScript is closer to JavaScript and OCaml The type system is highly regular like all languages in the ML family, allowing for efficient type checking, fast JavaScript emission, and aggressive optimizations. The emitted JavaScript code is very readable. This is a design goal of ReScript. Interoperation with TypeScript via genType As of ReScript 10.1, async/await is supported. Might require familiarity with more advanced functional programming techniques and purely functional data structures Learn more in the ReScript Language Manual documentation
Sparkplug is an industrial IoT communication protocol designed for use in SCADA systems. It provides a standard communication format for industrial devices and applications, making devices from different manufacturers interoperable. The Sparkplug specification was developed by Cirrus Link Solutions and Eclipse Foundation. It is openly available and not proprietary to a single company. So, it has the following benefits for the Sparkplug community: Allowing different systems and technologies to work together seamlessly, improving efficiency, reducing costs, and providing more options to consumers. Ensuring that products from different vendors can work together without any compatibility issues, increasing consumer choice, and fostering healthy competition among vendors. Encouraging innovation by enabling collaboration and sharing ideas and solutions, thus leading to the development of new products, services, and technologies. Promoting transparency, increasing trust, and reducing the risk of vendor lock-in or dependence on a single supplier. Ensuring that products and services are accessible to a wide range of users, including those with disabilities. Sparkplug aims to provide a standardized way to use MQTT for industrial applications and promote interoperability between devices and systems from different vendors. As such, the Sparkplug specification has been widely adopted by the industrial IoT community and is supported by many different vendors and organizations. 1. MQTT Messaging Architecture The Sparkplug specification is based on the MQTT protocol, a lightweight messaging protocol widely used in IoT applications. It is designed for low-bandwidth, high-latency networks and is adopted in IoT applications due to the following capabilities. Lightweight: MQTT is a lightweight protocol requiring minimal network bandwidth and is well-suited for low-bandwidth environments. Reliability: MQTT includes support for Quality of Service (QoS) levels, which ensure that messages are delivered reliably even in the face of network failures or intermittent connectivity. Scalability: MQTT is designed to be scalable and can support millions of devices and clients. Flexibility: MQTT can be used for one-to-one and one-to-many communication and supports both publish/subscribe and request/response messaging patterns. Security: MQTT includes support for security features such as authentication and encryption, which help to ensure that data is transmitted securely and confidentially. Overall, using MQTT for implementing the Sparkplug specification provides a range of benefits that make it well-suited for industrial IoT applications. Most importantly, based on MQTT pub-sub messaging architecture, the Sparkplug system supports the decoupling of data producers and consumers. This approach enables a more flexible and scalable data exchange process, as the data producer and consumer can operate independently. 2. Session State Awareness Decoupling can offer many benefits regarding scalability, flexibility, and resilience. Still, it requires careful attention to session management to ensure that the system maintains a coherent state across multiple requests and components. Session state awareness is an essential feature of Sparkplug that allows devices to maintain a connection to a broker even when the network connection is interrupted or lost. This is accomplished through the use of session state information that is stored by the broker and used to re-establish communication when the connection is restored. When a device connects to a Sparkplug broker, it establishes a session with the broker. During this session, the device can publish and subscribe to messages. The broker records the device's session state, including any subscriptions or messages not delivered due to a network interruption. Session state awareness is a critical feature for mission-critical industrial IoT applications that require high availability and reliable communication. By maintaining session state information, Sparkplug ensures that devices can quickly re-establish communication with the broker after a network interruption, reducing the risk of downtime and data loss. 3. Unified Namespace The unified namespace is a concept that refers to the ability of different devices and systems in an industrial setting to share data seamlessly, regardless of their manufacturer or communication protocol. It uses a standard naming convention and data model to ensure interoperability and facilitate data exchange. On the other hand, Sparkplug is a messaging specification to enable efficient and secure communication between IoT devices and applications. It is based on the MQTT protocol and incorporates the unified namespace concept to provide a standardized way of representing data and metadata across different devices and systems. In other words, Sparkplug utilizes the unified namespace concept to provide a common language for data exchange between industrial devices and systems. This helps to simplify integration and improve interoperability, making it easier to build and maintain complex IoT applications in industrial environments. 4. Central Data Repository In the context of the Sparkplug specification, a Central Data Repository (CDR) is a centralized server or platform that acts as a hub for receiving, processing, and distributing data from different industrial devices and applications. The CDR provides a standardized way to manage and store data in a scalable and efficient way and enables interoperability between different devices and connected applications. The benefits of a CDR include the following: Improved data quality: A CDR ensures that all data is standardized and consistent across an organization, improving the accuracy and reliability of the data. Simplified data management: With all data stored in one place, it's easier to manage and maintain. Faster access to data: A CDR provides a centralized location for data, making it easier and faster to access and analyze. Reduced data redundancy: By eliminating redundant data, a CDR reduces storage costs and minimizes the risk of data inconsistencies. The CDR is responsible for receiving MQTT messages from different devices and applications, parsing and validating the data, and storing it in a format easily accessed and processed by other systems. It also provides a set of APIs and interfaces that enable other systems to access and retrieve the data stored in the CDR, allowing it to send commands or instructions back to the devices and applications connected to it. 5. Single Source of Truth Single source of truth (SSOT) is a concept commonly used in information management and refers to the idea that there should be one authoritative source of data for a particular piece of information. All data related to a specific topic, such as a customer order, product information, or production details, should be stored in a single location and maintained consistently. Using an SSOT is also a key aspect of the Sparkplug specification, as it provides a standardized way to manage and store data consistently and reliably. Having a single source of truth makes it easier to ensure that all systems and applications have access to the most up-to-date and accurate information, which is essential for maintaining the integrity and reliability of the system. In practice, the SSOT is often implemented as part of the CDR, which is responsible for receiving and processing data from different devices and applications. The CDR stores all data in a standardized format that can be easily accessed and processed by other systems and serves as the central point of control for managing and monitoring different devices and systems. By using a centralized SSOT, it is possible to achieve a high degree of interoperability between different devices and systems and to ensure that all systems have access to the same data and information. Conclusion In the Sparkplug specification, an MQTT broker is an indispensable component to incorporate the above five conceptual capabilities. First, the MQTT broker offers a pub-sub decoupling messaging architecture for various Sparkplug host systems and devices to operate independently. Second, the MQTT broker provides a Last-Will mechanism to support the session state awareness between Sparkplug host systems and devices. Third, the Sparkplug specification defines the payload message standard and the topic namespace in the MQTT broker as required by the unified namespace. Fourth, the MQTT broker serves as a central data repository that receives messages from devices and forwards them to the host recipients. Finally, the MQTT broker stores the most up-to-date and accurate industrial information to maintain a single source of truth for the entire system. That's why MQTT broker is the right choice for the Sparkplug.
How To Approach Java, Databases, and SQL [Video]
June 2, 2023 by CORE
Effective Java Collection Framework: Best Practices and Tips
June 1, 2023 by
Explainable AI: Making the Black Box Transparent
May 16, 2023 by
Using Render Log Streams to Log to Papertrail
June 1, 2023 by CORE
Low Code vs. Traditional Development: A Comprehensive Comparison
May 16, 2023 by
How To Approach Java, Databases, and SQL [Video]
June 2, 2023 by CORE
Using Render Log Streams to Log to Papertrail
June 1, 2023 by CORE
How To Approach Java, Databases, and SQL [Video]
June 2, 2023 by CORE
Using Render Log Streams to Log to Papertrail
June 1, 2023 by CORE
Five IntelliJ Idea Plugins That Will Change the Way You Code
May 15, 2023 by