Compare commits

..

3 Commits

Author SHA1 Message Date
rony5394
0f316e8149 Some cleanup. 2026-05-25 15:39:14 +02:00
rony5394
ef37f17378 Added more proper waiting mechanism. 2026-05-24 13:13:27 +02:00
rony5394
778bab644f Switched originalScale storage 2026-05-19 14:20:22 +02:00
8 changed files with 130 additions and 52 deletions

View File

@@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"os" "os"
"time"
"github.com/docker/docker/api/types/registry" "github.com/docker/docker/api/types/registry"
); );
@@ -24,6 +25,9 @@ type Config struct {
HelperServiceName string HelperServiceName string
StorageContainerName string StorageContainerName string
PrepullImageServiceName string PrepullImageServiceName string
ServiceScaleTimeout time.Duration
SSHClientPKConfigName string
SSHHostSKSecretName string
} }
} }
@@ -45,6 +49,9 @@ func GetConfig()(Config, error){
cfg.Constants.HelperServiceName = "blazenaHelper"; cfg.Constants.HelperServiceName = "blazenaHelper";
cfg.Constants.StorageContainerName = "blazenaStorage"; cfg.Constants.StorageContainerName = "blazenaStorage";
cfg.Constants.PrepullImageServiceName = "blazenaPrepull"; cfg.Constants.PrepullImageServiceName = "blazenaPrepull";
cfg.Constants.ServiceScaleTimeout = time.Second * 15;
cfg.Constants.SSHClientPKConfigName = "blazenaSSHClientPublicKey";
cfg.Constants.SSHHostSKSecretName = "blazenaSSHHostPrivateKey";
err = json.Unmarshal(rawConfig, &cfg); err = json.Unmarshal(rawConfig, &cfg);

View File

@@ -2,10 +2,10 @@ package docker
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"io" "log/slog"
"net/http" "net/http"
"os"
"time" "time"
"github.com/docker/docker/api/types/swarm" "github.com/docker/docker/api/types/swarm"
@@ -20,44 +20,40 @@ func cleanup(w http.ResponseWriter, r *http.Request){
if !bearerAuth(w, r) {return;} if !bearerAuth(w, r) {return;}
rawBody, err := io.ReadAll(r.Body);
if err != nil {
panic("Failed to read body!");
}
var bodyDecoded struct{
ServiceId string `json:"serviceId"`
};
err = json.Unmarshal(rawBody, &bodyDecoded);
if err != nil {
panic("Failed to unmarshal json."+ err.Error());
}
listResoult, err := ApiClient.ServiceList(context.Background(), swarm.ServiceListOptions{}); listResoult, err := ApiClient.ServiceList(context.Background(), swarm.ServiceListOptions{});
if err != nil { if err != nil {
panic("Failed to list services."+ err.Error()); slog.Error("Failed to list services", slog.Any("propagatedError", err));
os.Exit(1);
} }
var helperServiceId string; var helperServiceId string;
var helperServices int;
for _, service := range listResoult{ for _, service := range listResoult{
if service.Spec.Labels["blazena.helper"] != "true" { if service.Spec.Labels["blazena.helper"] != "true" {
continue; continue;
} }
helperServiceId = service.ID; helperServiceId = service.ID;
break; helperServices ++;
} }
if helperServiceId == ""{ if helperServiceId == ""{
panic("Helper service not found!"); slog.Warn("Helper service wasn't found");
http.Error(w, "Internal Server Error", http.StatusInternalServerError);
return;
}
if helperServices > 1{
slog.Error("There are more than 1 helper service.");
os.Exit(1);
} }
err = ApiClient.ServiceRemove(context.Background(), helperServiceId); err = ApiClient.ServiceRemove(context.Background(), helperServiceId);
if err != nil { if err != nil {
panic("Failed to remove helper service."+ err.Error()); panic("Failed to remove helper service."+ err.Error());
} }
//TODO: add proper wait system
time.Sleep(7*time.Second); time.Sleep(7*time.Second);
fmt.Fprint(w, bodyDecoded.ServiceId); fmt.Fprint(w, helperServiceId);
} }

View File

@@ -8,7 +8,6 @@ import (
"os" "os"
"os/signal" "os/signal"
"strings" "strings"
"sync"
"syscall" "syscall"
"time" "time"
@@ -23,7 +22,6 @@ import (
// Add mutex. // Add mutex.
var ApiClient *client.Client; var ApiClient *client.Client;
var scale sync.Map;
var token string = "12345"; var token string = "12345";
var theConfig cfg.Config; var theConfig cfg.Config;

View File

@@ -1,11 +1,14 @@
package docker package docker
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"log/slog"
"net/http" "net/http"
"context" "os"
"github.com/docker/docker/api/types/swarm" "github.com/docker/docker/api/types/swarm"
"github.com/rony5394/blazena/shared" "github.com/rony5394/blazena/shared"
@@ -32,23 +35,35 @@ func exchangeKeys(w http.ResponseWriter, r *http.Request){
if err != nil { if err != nil {
panic("Failed to unmarshal json."+ err.Error()); panic("Failed to unmarshal json."+ err.Error());
} }
sshPkPem := bodyDecoded.SshPkPem; sshClientPkPem := bodyDecoded.SshPkPem;
hostKeypair := shared.GenerateSSHKeypair(); hostKeypair := shared.GenerateSSHKeypair();
encoded, err := json.Marshal(struct{HostPkPem string `json:"hostPkPem"`}{HostPkPem: hostKeypair.Public}); encoded, err := json.Marshal(struct{HostPkPem string `json:"hostPkPem"`}{HostPkPem: hostKeypair.Public});
if err != nil { if err != nil {
panic("I wonder how. I wonder why?"+err.Error()); slog.Error("Failed to marshal host pk into response.", slog.Any("propagatedError", err));
os.Exit(42);
} }
ApiClient.ConfigCreate(context.Background(), swarm.ConfigSpec{ _, err = ApiClient.ConfigCreate(context.Background(), swarm.ConfigSpec{
Data: []byte(sshPkPem), Data: []byte(sshClientPkPem),
Annotations: swarm.Annotations{Name: "blazenaSSHPublicKey"}, Annotations: swarm.Annotations{Name: theConfig.Constants.SSHClientPKConfigName},
}); });
ApiClient.SecretCreate(context.Background(), swarm.SecretSpec{ if err != nil {
slog.Error("Failed to create a config.", slog.Any("propagatedError", err));
os.Exit(1);
}
_, err = ApiClient.SecretCreate(context.Background(), swarm.SecretSpec{
Data: []byte(hostKeypair.Private), Data: []byte(hostKeypair.Private),
Annotations: swarm.Annotations{Name: "blazenaSSHHostPrivateKey"}, Annotations: swarm.Annotations{Name: theConfig.Constants.SSHHostSKSecretName},
}); });
if err != nil {
slog.Error("Failed to create a secret.", slog.Any("propagatedError", err));
os.Exit(1);
}
fmt.Fprint(w, string(encoded)); fmt.Fprint(w, string(encoded));
} }

View File

@@ -53,6 +53,7 @@ func prepare(w http.ResponseWriter, r *http.Request){
pullBlazenaImage(); pullBlazenaImage();
createHelper(theConfig, labels["blazena.node"], bodyDecoded.VolumeId); createHelper(theConfig, labels["blazena.node"], bodyDecoded.VolumeId);
//TODO: add proper waiting system.
time.Sleep(7*time.Second); time.Sleep(7*time.Second);
fmt.Fprint(w, bodyDecoded.ServiceId); fmt.Fprint(w, bodyDecoded.ServiceId);
@@ -129,7 +130,7 @@ func createHelper(Config cfg.Config, targetNode string, targetVolume string){
stopGracePeriod := time.Second * 5; stopGracePeriod := time.Second * 5;
helperCommand := `/usr/sbin/sshd -h /host-key -p 2222 -D`; helperCommand := `/usr/sbin/sshd -h /host-key -p 2222 -D`;
sshKeyConfigId, err := getConfigIDByName(ApiClient, "blazenaSSHPublicKey"); sshKeyConfigId, err := getConfigIDByName(ApiClient, theConfig.Constants.SSHClientPKConfigName);
if err != nil { if err != nil {
panic("Docker needs both id and name to mount config for some reason and getting id of it failed!"+err.Error()); panic("Docker needs both id and name to mount config for some reason and getting id of it failed!"+err.Error());
@@ -163,7 +164,7 @@ func createHelper(Config cfg.Config, targetNode string, targetVolume string){
Configs: []*swarm.ConfigReference{ Configs: []*swarm.ConfigReference{
&swarm.ConfigReference{ &swarm.ConfigReference{
ConfigID: sshKeyConfigId, ConfigID: sshKeyConfigId,
ConfigName: "blazenaSSHPublicKey", ConfigName: theConfig.Constants.SSHClientPKConfigName,
File: &swarm.ConfigReferenceFileTarget{ File: &swarm.ConfigReferenceFileTarget{
Name: "/root/.ssh/authorized_keys", Name: "/root/.ssh/authorized_keys",
Mode: 0600, Mode: 0600,
@@ -175,7 +176,7 @@ func createHelper(Config cfg.Config, targetNode string, targetVolume string){
Secrets: []*swarm.SecretReference{ Secrets: []*swarm.SecretReference{
&swarm.SecretReference{ &swarm.SecretReference{
SecretID: sshHostKeySecretId, SecretID: sshHostKeySecretId,
SecretName: "blazenaSSHHostPrivateKey", SecretName: theConfig.Constants.SSHHostSKSecretName,
File: &swarm.SecretReferenceFileTarget{ File: &swarm.SecretReferenceFileTarget{
Name: "/host-key", Name: "/host-key",
Mode: 0600, Mode: 0600,

View File

@@ -3,7 +3,10 @@ package docker
import ( import (
"context" "context"
"io" "io"
"log/slog"
"net/http" "net/http"
"os"
"strconv"
"time" "time"
"encoding/json" "encoding/json"
@@ -53,16 +56,16 @@ func scaleDown(w http.ResponseWriter, r *http.Request){
newScale := uint64(0); newScale := uint64(0);
updatedSpec.Mode.Replicated.Replicas = &newScale; updatedSpec.Mode.Replicated.Replicas = &newScale;
updatedSpec.Labels["blazena.scaledDown"] = "true"; updatedSpec.Labels["blazena.scaledDown"] = "true";
updatedSpec.Labels["blazena.originalScale"] = strconv.FormatUint(*originalScale, 10);
scale.Store(serviceId, *originalScale);
_, err = ApiClient.ServiceUpdate(context.Background(), serviceId, inspectresoult.Version, updatedSpec, swarm.ServiceUpdateOptions{}); _, err = ApiClient.ServiceUpdate(context.Background(), serviceId, inspectresoult.Version, updatedSpec, swarm.ServiceUpdateOptions{});
if(err != nil){ if(err != nil){
panic("Failed to update service."+ err.Error()); panic("Failed to update service."+ err.Error());
} }
ctx, cancel := context.WithTimeout(context.Background(), theConfig.Constants.ServiceScaleTimeout);
defer cancel();
//TODO: Add proper wait system waitForScale(serviceId, ctx, 0);
time.Sleep(15 * time.Second);
} }
func scaleUp(w http.ResponseWriter, r *http.Request){ func scaleUp(w http.ResponseWriter, r *http.Request){
@@ -99,22 +102,67 @@ func scaleUp(w http.ResponseWriter, r *http.Request){
return; return;
} }
originalScale, ok := scale.Load(serviceId); originalScale := inspectresoult.Spec.Labels["blazena.originalScale"];
if(!ok){
if(originalScale == ""){
panic("Its not okay!"); panic("Its not okay!");
} }
originalScaleChecked, ok := originalScale.(uint64); originalScaleChecked, err := strconv.ParseUint(originalScale, 10, 64);
if(!ok){ if(err != nil){
panic("Its very not okay!") panic("Its very not okay!"+ err.Error())
} }
updatedSpec := inspectresoult.Spec; updatedSpec := inspectresoult.Spec;
updatedSpec.Mode.Replicated.Replicas = &originalScaleChecked; updatedSpec.Mode.Replicated.Replicas = &originalScaleChecked;
delete(updatedSpec.Labels, "blazena.scaledDown"); delete(updatedSpec.Labels, "blazena.scaledDown");
delete(updatedSpec.Labels, "blazena.originalScale");
ApiClient.ServiceUpdate(context.Background(), serviceId, inspectresoult.Version, updatedSpec, swarm.ServiceUpdateOptions{}); _, err = ApiClient.ServiceUpdate(context.Background(), serviceId, inspectresoult.Version, updatedSpec, swarm.ServiceUpdateOptions{});
//TODO: Add proper wait system if err != nil {
time.Sleep(15 * time.Second); slog.Error("Failed to update/scale a service.", slog.Any("propagatedError", err), slog.String("serviceId", serviceId));
os.Exit(1);
}
ctx, cancel := context.WithTimeout(context.Background(), theConfig.Constants.ServiceScaleTimeout);
defer cancel();
waitForScale(serviceId, ctx, int(originalScaleChecked));
}
func waitForScale(serviceId string, ctx context.Context, desiredCount int){
startTime := time.Now();
for ctx.Err() == nil {
tasks, err := ApiClient.TaskList(context.Background(), swarm.TaskListOptions{});
if err != nil {
slog.Error("Failed to list tasks.", slog.Any("propagatedError", err));
os.Exit(1);
}
var running int;
for _, task := range tasks {
if task.ServiceID != serviceId {
continue;
}
if task.Status.State == swarm.TaskStateRunning{
running ++;
}
}
if running == desiredCount {
slog.Debug("Rescaled Service",
slog.String("serviceId", serviceId),
slog.Any("took", time.Since(startTime)),
slog.Any("targetScale", desiredCount),
);
return;
}
time.Sleep(1*time.Second);
}
if ctx.Err() == context.DeadlineExceeded{
slog.Error("Failed to rescale service in given time.", slog.Any("serviceId", serviceId));
}
} }

14
main.go
View File

@@ -1,12 +1,16 @@
package main package main
import ( import (
"log/slog"
"os" "os"
"time"
cfg "github.com/rony5394/blazena/config"
"github.com/rony5394/blazena/docker" "github.com/rony5394/blazena/docker"
"github.com/rony5394/blazena/host" "github.com/rony5394/blazena/host"
cfg "github.com/rony5394/blazena/config" )
"log/slog"
); //TODO: consider adding blazena.doNotTouch
/* /*
If the exit code is X then it means Y: If the exit code is X then it means Y:
@@ -39,6 +43,8 @@ func main() {
slog.Debug("Config", slog.Any("Value", config)); slog.Debug("Config", slog.Any("Value", config));
startTime := time.Now();
mode := os.Args[1]; mode := os.Args[1];
switch mode { switch mode {
case "docker": case "docker":
@@ -53,4 +59,6 @@ func main() {
default: default:
panic("Invalid runtime mode!"); panic("Invalid runtime mode!");
} }
slog.Debug("Whole run took", slog.String("time", time.Since(startTime).String()));
} }

View File

@@ -4,6 +4,8 @@ import (
"crypto/ed25519" "crypto/ed25519"
"crypto/rand" "crypto/rand"
"encoding/pem" "encoding/pem"
"log/slog"
"os"
"golang.org/x/crypto/ssh" "golang.org/x/crypto/ssh"
) )
@@ -16,19 +18,22 @@ type Keypair struct {
func GenerateSSHKeypair() Keypair { func GenerateSSHKeypair() Keypair {
publicKey, privateKey, err := ed25519.GenerateKey(rand.Reader) publicKey, privateKey, err := ed25519.GenerateKey(rand.Reader)
if err != nil { if err != nil {
panic(err) slog.Error("Failed to generate an ssh keypair.", slog.Any("propagatedError", err));
os.Exit(42);
} }
privBlock, err := ssh.MarshalPrivateKey(privateKey, "") privBlock, err := ssh.MarshalPrivateKey(privateKey, "")
if err != nil { if err != nil {
panic(err) slog.Error("Failed to marshal private key", slog.Any("propagatedError", err));
os.Exit(42);
} }
privPem := pem.EncodeToMemory(privBlock) privPem := pem.EncodeToMemory(privBlock)
sshPubKey, err := ssh.NewPublicKey(publicKey) sshPubKey, err := ssh.NewPublicKey(publicKey)
if err != nil { if err != nil {
panic(err) slog.Error("Failed deriving public ssh key from a private one.", slog.Any("propagatedError", err));
os.Exit(42);
} }
pubBytes := ssh.MarshalAuthorizedKey(sshPubKey) pubBytes := ssh.MarshalAuthorizedKey(sshPubKey)